303 lines
9.7 KiB
Python
303 lines
9.7 KiB
Python
# plugins/media/routes.py
|
|
|
|
import os
|
|
import uuid
|
|
from werkzeug.utils import secure_filename
|
|
from datetime import datetime
|
|
from PIL import Image, ExifTags
|
|
|
|
from flask import (
|
|
Blueprint, request, redirect, url_for,
|
|
flash, send_from_directory, current_app,
|
|
jsonify, abort
|
|
)
|
|
from flask_login import login_required, current_user
|
|
|
|
from app import db
|
|
from .models import Media, ImageHeart, FeaturedImage
|
|
from plugins.plant.models import Plant
|
|
|
|
bp = Blueprint(
|
|
"media",
|
|
__name__,
|
|
url_prefix="/media",
|
|
template_folder="templates"
|
|
)
|
|
|
|
|
|
# ─── Context Processor ─────────────────────────────────────────────────────────
|
|
@bp.app_context_processor
|
|
def utility_processor():
|
|
return dict(generate_image_url=generate_image_url)
|
|
|
|
|
|
# ─── Helpers & Config ─────────────────────────────────────────────────────────
|
|
def allowed_file(filename):
|
|
ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
|
|
return ext in current_app.config.get(
|
|
"ALLOWED_EXTENSIONS",
|
|
{"png", "jpg", "jpeg", "gif", "webp"}
|
|
)
|
|
|
|
|
|
def get_upload_path(plugin: str, related_id: int):
|
|
"""
|
|
Build and return (absolute_dir, subdir) where uploads are stored:
|
|
{UPLOAD_FOLDER}/{plugin}s/{related_id}/YYYY/MM/DD/
|
|
"""
|
|
now = datetime.utcnow()
|
|
date_path = f"{now.year}/{now.month:02}/{now.day:02}"
|
|
subdir = f"{plugin}s/{related_id}/{date_path}"
|
|
base = current_app.config["UPLOAD_FOLDER"]
|
|
abs_dir = os.path.join(base, subdir)
|
|
os.makedirs(abs_dir, exist_ok=True)
|
|
return abs_dir, subdir
|
|
|
|
|
|
def _strip_exif(image: Image.Image) -> Image.Image:
|
|
try:
|
|
exif = image._getexif()
|
|
orient_key = next(
|
|
(k for k, v in ExifTags.TAGS.items() if v == "Orientation"),
|
|
None
|
|
)
|
|
if exif and orient_key in exif:
|
|
o = exif[orient_key]
|
|
if o == 3:
|
|
image = image.rotate(180, expand=True)
|
|
elif o == 6:
|
|
image = image.rotate(270, expand=True)
|
|
elif o == 8:
|
|
image = image.rotate(90, expand=True)
|
|
except Exception:
|
|
pass
|
|
return image
|
|
|
|
|
|
def _process_upload_file(file, uploader_id, plugin='', related_id=0, plant_id=None, growlog_id=None, caption=None):
|
|
"""Handles saving an uploaded file and creating the Media record."""
|
|
|
|
# Generate a unique filename
|
|
now = datetime.utcnow()
|
|
unique_id = str(uuid.uuid4()).replace("-", "")
|
|
secure_name = secure_filename(file.filename)
|
|
filename = f"{unique_id}_{secure_name}"
|
|
|
|
# Construct the save path
|
|
storage_path = os.path.join(
|
|
current_app.config['UPLOAD_FOLDER'],
|
|
str(uploader_id),
|
|
now.strftime('%Y/%m/%d')
|
|
)
|
|
os.makedirs(storage_path, exist_ok=True)
|
|
|
|
full_path = os.path.join(storage_path, filename)
|
|
file.save(full_path)
|
|
|
|
file_url = f"/{uploader_id}/{now.strftime('%Y/%m/%d')}/{filename}"
|
|
|
|
media = Media(
|
|
plugin=plugin,
|
|
related_id=related_id,
|
|
filename=filename,
|
|
uploaded_at=now,
|
|
uploader_id=uploader_id,
|
|
caption=caption,
|
|
plant_id=plant_id,
|
|
growlog_id=growlog_id,
|
|
created_at=now,
|
|
file_url=file_url
|
|
)
|
|
return media
|
|
|
|
|
|
# ─── Exposed Utilities ─────────────────────────────────────────────────────────
|
|
def save_media_file(file, user_id, **ctx):
|
|
return _process_upload_file(file, user_id, **ctx)
|
|
|
|
|
|
def delete_media_file(media: Media):
|
|
"""
|
|
Remove file from disk and delete DB record.
|
|
"""
|
|
base = current_app.config["UPLOAD_FOLDER"]
|
|
path = os.path.join(base, media.file_url)
|
|
if os.path.exists(path):
|
|
os.remove(path)
|
|
db.session.delete(media)
|
|
db.session.commit()
|
|
|
|
|
|
def rotate_media_file(media: Media):
|
|
"""
|
|
Rotate the image on disk by -90° and save.
|
|
"""
|
|
base = current_app.config["UPLOAD_FOLDER"]
|
|
path = os.path.join(base, media.file_url)
|
|
with Image.open(path) as img:
|
|
img.rotate(-90, expand=True).save(path)
|
|
db.session.commit()
|
|
|
|
|
|
def generate_image_url(media: Media):
|
|
"""
|
|
Given a Media instance (or None), return its public URL
|
|
or a placeholder if no media.
|
|
"""
|
|
if media and media.file_url:
|
|
return url_for("media.media_file", filename=media.file_url)
|
|
w, h = current_app.config.get("STANDARD_IMG_SIZE", (300, 200))
|
|
return f"https://placehold.co/{w}x{h}"
|
|
|
|
|
|
# ─── Routes ────────────────────────────────────────────────────────────────────
|
|
@bp.route("/", methods=["GET"])
|
|
def media_index():
|
|
return redirect(url_for("core_ui.home"))
|
|
|
|
|
|
@bp.route("/<plugin>/<filename>")
|
|
def serve(plugin, filename):
|
|
"""
|
|
Stream uploaded media by plugin & filename, enforcing Media lookup.
|
|
"""
|
|
m = Media.query.filter_by(file_url=f"{plugin}s/%/{filename}").first_or_404()
|
|
# reconstruct disk path
|
|
date_path = m.uploaded_at.strftime("%Y/%m/%d")
|
|
disk_dir = os.path.join(
|
|
current_app.config["UPLOAD_FOLDER"],
|
|
f"{plugin}s",
|
|
str(m.plant_id or m.growlog_id),
|
|
date_path
|
|
)
|
|
return send_from_directory(disk_dir, filename)
|
|
|
|
|
|
@bp.route("/files/<path:filename>")
|
|
def media_file(filename):
|
|
base = current_app.config["UPLOAD_FOLDER"]
|
|
full = os.path.normpath(os.path.join(base, filename))
|
|
if not full.startswith(os.path.abspath(base)):
|
|
abort(404)
|
|
return send_from_directory(base, filename)
|
|
|
|
|
|
@bp.route("/<filename>")
|
|
def media_public(filename):
|
|
base = current_app.config["UPLOAD_FOLDER"]
|
|
m = Media.query.filter(Media.file_url.endswith(filename)).first_or_404()
|
|
full = os.path.normpath(os.path.join(base, m.file_url))
|
|
if not full.startswith(os.path.abspath(base)):
|
|
abort(404)
|
|
return send_from_directory(base, m.file_url)
|
|
|
|
|
|
@bp.route("/heart/<int:media_id>", methods=["POST"])
|
|
@login_required
|
|
def toggle_heart(media_id):
|
|
existing = ImageHeart.query.filter_by(
|
|
user_id=current_user.id, media_id=media_id
|
|
).first()
|
|
if existing:
|
|
db.session.delete(existing)
|
|
db.session.commit()
|
|
return jsonify({"status": "unhearted"})
|
|
heart = ImageHeart(user_id=current_user.id, media_id=media_id)
|
|
db.session.add(heart)
|
|
db.session.commit()
|
|
return jsonify({"status": "hearted"})
|
|
|
|
|
|
@bp.route("/add/<string:plant_uuid>", methods=["POST"])
|
|
@login_required
|
|
def add_media(plant_uuid):
|
|
plant = Plant.query.filter_by(uuid=plant_uuid).first_or_404()
|
|
file = request.files.get("file")
|
|
if not file or not allowed_file(file.filename):
|
|
flash("Invalid or missing file.", "danger")
|
|
return redirect(request.referrer or url_for("plant.edit", uuid_val=plant_uuid))
|
|
|
|
_process_upload_file(
|
|
file=file,
|
|
uploader_id=current_user.id,
|
|
plugin="plant",
|
|
related_id=plant.id
|
|
)
|
|
flash("Media uploaded successfully.", "success")
|
|
return redirect(request.referrer or url_for("plant.edit", uuid_val=plant_uuid))
|
|
|
|
|
|
@bp.route("/feature/<string:context>/<int:context_id>/<int:media_id>", methods=["POST"])
|
|
@login_required
|
|
def set_featured_image(context, context_id, media_id):
|
|
media = Media.query.get_or_404(media_id)
|
|
if media.uploader_id != current_user.id and current_user.role != "admin":
|
|
return jsonify({"error": "Not authorized"}), 403
|
|
|
|
FeaturedImage.query.filter_by(
|
|
context=context,
|
|
context_id=context_id
|
|
).delete()
|
|
|
|
feat = FeaturedImage(
|
|
media_id=media.id,
|
|
context=context,
|
|
context_id=context_id,
|
|
is_featured=True
|
|
)
|
|
db.session.add(feat)
|
|
|
|
if context == "plant":
|
|
plant = Plant.query.get_or_404(context_id)
|
|
plant.featured_media_id = media.id
|
|
|
|
db.session.commit()
|
|
return jsonify({"status": "success", "media_id": media.id})
|
|
|
|
|
|
@bp.route("/delete/<int:media_id>", methods=["POST"])
|
|
@login_required
|
|
def delete_media(media_id):
|
|
media = Media.query.get_or_404(media_id)
|
|
if media.uploader_id != current_user.id and current_user.role != "admin":
|
|
flash("Not authorized to delete this media.", "danger")
|
|
return redirect(request.referrer or url_for("core_ui.home"))
|
|
|
|
delete_media_file(media)
|
|
flash("Media deleted.", "success")
|
|
return redirect(request.referrer or url_for("plant.edit", uuid_val=media.plant.uuid))
|
|
|
|
|
|
@bp.route("/bulk_delete/<string:plant_uuid>", methods=["POST"])
|
|
@login_required
|
|
def bulk_delete_media(plant_uuid):
|
|
plant = Plant.query.filter_by(uuid=plant_uuid).first_or_404()
|
|
media_ids = request.form.getlist("delete_ids")
|
|
deleted = 0
|
|
|
|
for mid in media_ids:
|
|
m = Media.query.filter_by(id=mid, plant_id=plant.id).first()
|
|
if m and (m.uploader_id == current_user.id or current_user.role == "admin"):
|
|
delete_media_file(m)
|
|
deleted += 1
|
|
|
|
flash(f"{deleted} image(s) deleted.", "success")
|
|
return redirect(request.referrer or url_for("plant.edit", uuid_val=plant_uuid))
|
|
|
|
|
|
@bp.route("/rotate/<int:media_id>", methods=["POST"])
|
|
@login_required
|
|
def rotate_media(media_id):
|
|
media = Media.query.get_or_404(media_id)
|
|
if media.uploader_id != current_user.id and current_user.role != "admin":
|
|
flash("Not authorized to rotate this media.", "danger")
|
|
return redirect(request.referrer or url_for("core_ui.home"))
|
|
|
|
try:
|
|
rotate_media_file(media)
|
|
flash("Image rotated successfully.", "success")
|
|
except Exception as e:
|
|
flash(f"Failed to rotate image: {e}", "danger")
|
|
|
|
return redirect(request.referrer or url_for("plant.edit", uuid_val=media.plant.uuid))
|