Files
natureinpots_community/plugins/media/routes.py
2025-06-26 05:21:21 -05:00

388 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import uuid
import io
import traceback
import tempfile
import logging
from datetime import datetime
from werkzeug.utils import secure_filename
from werkzeug.datastructures import FileStorage
from flask import (
Blueprint, request, redirect, url_for,
flash, send_from_directory, current_app,
jsonify, abort
)
from flask_login import login_required, current_user
from PIL import Image, ExifTags
from app import db
from .models import Media, ImageHeart, FeaturedImage
bp = Blueprint(
"media",
__name__,
url_prefix="/media",
template_folder="templates"
)
# ─── Context Processor ─────────────────────────────────────────────────────────
@bp.app_context_processor
def utility_processor():
return dict(generate_image_url=generate_image_url)
# ─── Helpers & Config ─────────────────────────────────────────────────────────
def allowed_file(filename):
ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
return ext in current_app.config.get(
"ALLOWED_EXTENSIONS",
{"png", "jpg", "jpeg", "gif", "webp"}
)
def get_upload_path(plugin: str, related_id: int):
"""
Return (absolute_dir, subdir) where uploads are stored:
<UPLOAD_FOLDER>/<plugin>/<related_id>/
"""
base = current_app.config["UPLOAD_FOLDER"]
subdir = os.path.join(plugin, str(related_id))
abs_dir = os.path.join(base, subdir)
os.makedirs(abs_dir, exist_ok=True)
return abs_dir, subdir
def _strip_exif(image: Image.Image) -> Image.Image:
try:
exif = image._getexif()
orient_key = next(
(k for k, v in ExifTags.TAGS.items() if v == "Orientation"),
None
)
if exif and orient_key in exif:
o = exif[orient_key]
if o == 3:
image = image.rotate(180, expand=True)
elif o == 6:
image = image.rotate(270, expand=True)
elif o == 8:
image = image.rotate(90, expand=True)
except Exception:
pass
return image
def _process_upload_file(
file: FileStorage,
uploader_id: int,
plugin: str = '',
related_id: int = 0,
plant_id=None,
growlog_id=None,
caption=None
) -> Media:
"""
Handles saving an uploaded file under:
<UPLOAD_FOLDER>/<plugin>/<related_id>/<uuid>.<ext>
and returns a new Media object (not yet committed).
"""
# 1) Unique filename (uuid + original extension)
uid = str(uuid.uuid4()).replace('-', '')
_, ext = os.path.splitext(file.filename)
filename = f"{uid}{ext.lower()}"
# 2) Build the folder
abs_dir, subdir = get_upload_path(plugin, related_id)
# 3) Save to disk
full_path = os.path.join(abs_dir, filename)
file.save(full_path)
# 4) Record the relative URL fragment (for lookup)
file_url = f"{subdir}/{filename}"
# 5) Build the Media row
now = datetime.utcnow()
media = Media(
plugin=plugin,
related_id=related_id,
filename=filename,
uploaded_at=now,
uploader_id=uploader_id,
caption=caption,
plant_id=plant_id,
growlog_id=growlog_id,
created_at=now,
file_url=file_url
)
return media
# ─── Exposed Utilities ─────────────────────────────────────────────────────────
def save_media_file(file, user_id, **ctx):
return _process_upload_file(file, user_id, **ctx)
def delete_media_file(media: Media):
base = current_app.config["UPLOAD_FOLDER"]
full = os.path.normpath(os.path.join(base, media.file_url))
if os.path.exists(full):
os.remove(full)
db.session.delete(media)
db.session.commit()
def rotate_media_file(media: Media):
base = current_app.config["UPLOAD_FOLDER"]
full = os.path.normpath(os.path.join(base, media.file_url))
with Image.open(full) as img:
img.rotate(-90, expand=True).save(full)
db.session.commit()
def generate_image_url(media: Media):
"""
Given a Media instance (or None), return its public URL
under our new schema, or a placeholder if no media.
"""
if media and media.file_url:
# use singular context
return url_for(
"media.serve_context_media",
context=media.plugin,
context_id=media.related_id,
filename=media.filename
)
# fallback
w, h = current_app.config.get("STANDARD_IMG_SIZE", (300, 200))
return f"https://placehold.co/{w}x{h}"
@bp.route("/<context>/<int:context_id>/<filename>")
def serve_context_media(context, context_id, filename):
"""
Serve files saved under:
<UPLOAD_FOLDER>/<plugin>/<context_id>/<filename>
Accepts both singular and trailing-'s' contexts:
/media/plant/1/foo.jpg OR /media/plants/1/foo.jpg
"""
# — determine plugin name (always singular) —
valid = {"user", "plant", "growlog", "vendor"}
if context in valid:
plugin = context
elif context.endswith("s") and context[:-1] in valid:
plugin = context[:-1]
else:
logging.debug(f"Invalid context '{context}' in URL")
abort(404)
# — build filesystem path —
base = current_app.config["UPLOAD_FOLDER"]
directory = os.path.join(base, plugin, str(context_id))
full_path = os.path.join(directory, filename)
# — Debug log what were about to do —
logging.debug(f"[serve_context_media] plugin={plugin!r}, "
f"context_id={context_id!r}, filename={filename!r}")
logging.debug(f"[serve_context_media] checking DB for media row…")
logging.debug(f"[serve_context_media] filesystem path = {full_path!r}, exists? {os.path.exists(full_path)}")
# — Check the DB row (but dont abort if missing) —
media = Media.query.filter_by(
plugin=plugin,
related_id=context_id,
filename=filename
).first()
if not media:
logging.warning(f"[serve_context_media] no Media DB row for "
f"{plugin}/{context_id}/{filename!r}, "
"will try serving from disk anyway")
# — If the file exists on disk, serve it — otherwise 404 —
if os.path.exists(full_path):
return send_from_directory(directory, filename)
logging.error(f"[serve_context_media] file not found on disk: {full_path!r}")
abort(404)
# ─── Legacy / Other Routes (you can leave these for backward compatibility) ────
@bp.route("/", methods=["GET"])
def media_index():
return redirect(url_for("core_ui.home"))
@bp.route("/<plugin>/<filename>")
def serve(plugin, filename):
# optional legacy support
m = Media.query.filter_by(file_url=f"{plugin}s/%/{filename}").first_or_404()
date_path = m.uploaded_at.strftime("%Y/%m/%d")
disk_dir = os.path.join(
current_app.config["UPLOAD_FOLDER"],
f"{plugin}s",
str(m.plant_id or m.growlog_id),
date_path
)
return send_from_directory(disk_dir, filename)
@bp.route("/<filename>")
def media_public(filename):
base = current_app.config["UPLOAD_FOLDER"]
m = Media.query.filter(Media.file_url.endswith(filename)).first_or_404()
full = os.path.normpath(os.path.join(base, m.file_url))
if not full.startswith(os.path.abspath(base)):
abort(404)
return send_from_directory(base, m.file_url)
@bp.route("/heart/<int:media_id>", methods=["POST"])
@login_required
def toggle_heart(media_id):
existing = ImageHeart.query.filter_by(
user_id=current_user.id, media_id=media_id
).first()
if existing:
db.session.delete(existing)
db.session.commit()
return jsonify({"status": "unhearted"})
heart = ImageHeart(user_id=current_user.id, media_id=media_id)
db.session.add(heart)
db.session.commit()
return jsonify({"status": "hearted"})
@bp.route("/add/<string:plant_uuid>", methods=["POST"])
@login_required
def add_media(plant_uuid):
plant = Plant.query.filter_by(uuid=plant_uuid).first_or_404()
file = request.files.get("file")
if not file or not allowed_file(file.filename):
flash("Invalid or missing file.", "danger")
return redirect(request.referrer or url_for("plant.edit", uuid_val=plant_uuid))
_process_upload_file(
file=file,
uploader_id=current_user.id,
plugin="plant",
related_id=plant.id
)
flash("Media uploaded successfully.", "success")
return redirect(request.referrer or url_for("plant.edit", uuid_val=plant_uuid))
@bp.route("/<context>/<int:context_id>/<filename>")
def media_file(context, context_id, filename):
# your existing serve_context_media logic here
# (unchanged)
from flask import current_app, send_from_directory
import os
valid = {"user", "plant", "growlog", "vendor"}
if context in valid:
plugin = context
elif context.endswith("s") and context[:-1] in valid:
plugin = context[:-1]
else:
abort(404)
media = Media.query.filter_by(
plugin=plugin,
related_id=context_id,
filename=filename
).first_or_404()
base = current_app.config["UPLOAD_FOLDER"]
directory = os.path.join(base, plugin, str(context_id))
return send_from_directory(directory, filename)
@bp.route('/featured/<context>/<int:context_id>/<int:media_id>', methods=['POST'])
def set_featured_image(context, context_id, media_id):
"""
Singleselect “featured” toggle for any plugin (plants, grow_logs, etc).
"""
# normalize to singular plugin name (matches Media.plugin & FeaturedImage.context)
valid = {'plant', 'growlog', 'user', 'vendor'}
if context in valid:
plugin_name = context
elif context.endswith('s') and context[:-1] in valid:
plugin_name = context[:-1]
else:
abort(404)
# must own that media row
media = Media.query.filter_by(
plugin=plugin_name,
related_id=context_id,
id=media_id
).first_or_404()
# clear out any existing featured rows
FeaturedImage.query.filter_by(
context=plugin_name,
context_id=context_id
).delete()
# insert new featured row
fi = FeaturedImage(
media_id=media.id,
context=plugin_name,
context_id=context_id,
is_featured=True
)
db.session.add(fi)
db.session.commit()
# Redirect back with a flash instead of JSON
flash("Featured image updated.", "success")
return redirect(request.referrer or url_for("core_ui.home"))
@bp.route("/delete/<int:media_id>", methods=["POST"])
@login_required
def delete_media(media_id):
media = Media.query.get_or_404(media_id)
if media.uploader_id != current_user.id and current_user.role != "admin":
flash("Not authorized to delete this media.", "danger")
return redirect(request.referrer or url_for("core_ui.home"))
delete_media_file(media)
flash("Media deleted.", "success")
return redirect(request.referrer or url_for("plant.edit", uuid_val=media.plant.uuid))
@bp.route("/bulk_delete/<string:plant_uuid>", methods=["POST"])
@login_required
def bulk_delete_media(plant_uuid):
plant = Plant.query.filter_by(uuid=plant_uuid).first_or_404()
media_ids = request.form.getlist("delete_ids")
deleted = 0
for mid in media_ids:
m = Media.query.filter_by(id=mid, plant_id=plant.id).first()
if m and (m.uploader_id == current_user.id or current_user.role == "admin"):
delete_media_file(m)
deleted += 1
flash(f"{deleted} image(s) deleted.", "success")
return redirect(request.referrer or url_for("plant.edit", uuid_val=plant_uuid))
@bp.route("/rotate/<int:media_id>", methods=["POST"])
@login_required
def rotate_media(media_id):
media = Media.query.get_or_404(media_id)
if media.uploader_id != current_user.id and current_user.role != "admin":
flash("Not authorized to rotate this media.", "danger")
return redirect(request.referrer or url_for("core_ui.home"))
try:
rotate_media_file(media)
flash("Image rotated successfully.", "success")
except Exception as e:
flash(f"Failed to rotate image: {e}", "danger")
return redirect(request.referrer or url_for("plant.edit", uuid_val=media.plant.uuid))