# plugins/utility/routes.py # Standard library import csv import io import os import re import uuid import zipfile import tempfile import difflib import traceback from datetime import datetime # Third‐party from flask import ( Blueprint, request, render_template, redirect, flash, session, url_for, send_file, current_app ) from flask_login import login_required, current_user from flask_wtf.csrf import generate_csrf from werkzeug.utils import secure_filename from werkzeug.datastructures import FileStorage # Application from app import db from app.neo4j_utils import get_neo4j_handler # Plugins from plugins.plant.models import ( Plant, PlantCommonName, PlantScientificName, PlantOwnershipLog, ) from plugins.media.models import Media from plugins.media.routes import _process_upload_file from plugins.utility.models import ImportBatch bp = Blueprint( 'utility', __name__, template_folder='templates', url_prefix='/utility' ) @bp.route("/", methods=["GET"]) @login_required def index(): return redirect(url_for("utility.upload")) # ──────────────────────────────────────────────────────────────────────────────── # Required headers for your sub-app export ZIP PLANT_HEADERS = [ "UUID","Type","Name","Scientific Name", "Vendor Name","Price","Mother UUID","Notes", "Short ID" ] MEDIA_HEADERS = [ "Plant UUID","Image Path","Uploaded At","Source Type" ] # Headers for standalone CSV review flow REQUIRED_HEADERS = {"uuid", "plant_type", "name", "scientific_name", "mother_uuid"} @bp.route("/upload", methods=["GET", "POST"]) @login_required def upload(): if request.method == "POST": file = request.files.get("file") if not file or not file.filename: flash("No file selected", "error") return redirect(request.url) filename = file.filename.lower().strip() # ── ZIP Import Flow ──────────────────────────────────────────────── if filename.endswith(".zip"): tmp_zip = tempfile.NamedTemporaryFile(delete=False, suffix=".zip") file.save(tmp_zip.name) tmp_zip.close() try: z = zipfile.ZipFile(tmp_zip.name) except zipfile.BadZipFile: os.remove(tmp_zip.name) flash("Uploaded file is not a valid ZIP.", "danger") return redirect(request.url) names = z.namelist() if "plants.csv" not in names or "media.csv" not in names: os.remove(tmp_zip.name) flash("ZIP must contain both plants.csv and media.csv", "danger") return redirect(request.url) export_id = None if "metadata.txt" in names: meta = z.read("metadata.txt").decode("utf-8", "ignore") for line in meta.splitlines(): if line.startswith("export_id,"): export_id = line.split(",", 1)[1].strip() break if not export_id: os.remove(tmp_zip.name) flash("metadata.txt missing or missing export_id", "danger") return redirect(request.url) if ImportBatch.query.filter_by(export_id=export_id, user_id=current_user.id).first(): os.remove(tmp_zip.name) flash("This export has already been imported.", "info") return redirect(request.url) batch = ImportBatch( export_id=export_id, user_id=current_user.id, imported_at=datetime.utcnow() ) db.session.add(batch) db.session.commit() tmpdir = tempfile.mkdtemp() z.extractall(tmpdir) # --- load and validate plants.csv --- plant_path = os.path.join(tmpdir, "plants.csv") with open(plant_path, newline="", encoding="utf-8-sig") as pf: reader = csv.DictReader(pf) if reader.fieldnames != PLANT_HEADERS: missing = set(PLANT_HEADERS) - set(reader.fieldnames or []) extra = set(reader.fieldnames or []) - set(PLANT_HEADERS) os.remove(tmp_zip.name) flash(f"plants.csv header mismatch. Missing: {missing}, Extra: {extra}", "danger") return redirect(request.url) plant_rows = list(reader) # --- load and validate media.csv --- media_path = os.path.join(tmpdir, "media.csv") with open(media_path, newline="", encoding="utf-8-sig") as mf: mreader = csv.DictReader(mf) if mreader.fieldnames != MEDIA_HEADERS: missing = set(MEDIA_HEADERS) - set(mreader.fieldnames or []) extra = set(mreader.fieldnames or []) - set(MEDIA_HEADERS) os.remove(tmp_zip.name) flash(f"media.csv header mismatch. Missing: {missing}, Extra: {extra}", "danger") return redirect(request.url) media_rows = list(mreader) # --- import plants (first pass, only set mother_uuid if parent exists) --- neo = get_neo4j_handler() added_plants = 0 plant_map = {} for row in plant_rows: common = PlantCommonName.query.filter_by(name=row["Name"]).first() if not common: common = PlantCommonName(name=row["Name"]) db.session.add(common) db.session.flush() scientific = PlantScientificName.query.filter_by( name=row["Scientific Name"] ).first() if not scientific: scientific = PlantScientificName( name=row["Scientific Name"], common_id=common.id ) db.session.add(scientific) db.session.flush() raw_mu = row.get("Mother UUID") or None mu_for_insert = raw_mu if raw_mu in plant_map else None p = Plant( uuid=row["UUID"], common_id=common.id, scientific_id=scientific.id, plant_type=row["Type"], owner_id=current_user.id, vendor_name=row["Vendor Name"] or None, price=float(row["Price"]) if row["Price"] else None, mother_uuid=mu_for_insert, notes=row["Notes"] or None, short_id=(row.get("Short ID") or None), data_verified=True ) db.session.add(p) db.session.flush() plant_map[p.uuid] = p.id log = PlantOwnershipLog( plant_id=p.id, user_id=current_user.id, date_acquired=datetime.utcnow(), transferred=False, is_verified=True ) db.session.add(log) neo.create_plant_node(p.uuid, row["Name"]) if raw_mu: neo.create_lineage( child_uuid=p.uuid, parent_uuid=raw_mu ) added_plants += 1 db.session.commit() # --- second pass: backfill mother_uuid for all rows --- for row in plant_rows: raw_mu = row.get("Mother UUID") or None if raw_mu: Plant.query.filter_by(uuid=row["UUID"]).update({ 'mother_uuid': raw_mu }) db.session.commit() # --- import media (unchanged) --- added_media = 0 for mrow in media_rows: plant_uuid = mrow["Plant UUID"] plant_id = plant_map.get(plant_uuid) if not plant_id: continue subpath = mrow["Image Path"].split('uploads/', 1)[-1] src = os.path.join(tmpdir, "images", subpath) if not os.path.isfile(src): continue try: with open(src, "rb") as f: file_storage = FileStorage( stream=io.BytesIO(f.read()), filename=os.path.basename(subpath), content_type='image/jpeg' ) media = _process_upload_file( file=file_storage, uploader_id=current_user.id, plugin="plant", related_id=plant_id, plant_id=plant_id ) media.uploaded_at = datetime.fromisoformat(mrow["Uploaded At"]) media.caption = mrow["Source Type"] db.session.add(media) added_media += 1 except Exception as e: current_app.logger.warning( f"Failed to import media file: {subpath} → {e}" ) current_app.logger.debug(traceback.format_exc()) db.session.commit() neo.close() os.remove(tmp_zip.name) flash(f"Imported {added_plants} plants and {added_media} images.", "success") return redirect(request.url) # ── CSV Review Flow ───────────────────────────────────────────────── if filename.endswith(".csv"): try: stream = io.StringIO(file.stream.read().decode("utf-8-sig")) reader = csv.DictReader(stream) except Exception: flash("Failed to read CSV file. Ensure it is valid UTF-8.", "error") return redirect(request.url) headers = set(reader.fieldnames or []) missing = REQUIRED_HEADERS - headers if missing: flash(f"Missing required CSV headers: {missing}", "error") return redirect(request.url) session["pending_rows"] = [] review_list = [] all_common = {c.name.lower(): c for c in PlantCommonName.query.all()} all_sci = {s.name.lower(): s for s in PlantScientificName.query.all()} for row in reader: uuid_val = row.get("uuid", "").strip().strip('"') name = row.get("name", "").strip() sci_name = row.get("scientific_name", "").strip() plant_type = row.get("plant_type", "").strip() or "plant" mother_uuid = row.get("mother_uuid", "").strip().strip('"') if not (uuid_val and name and plant_type): continue suggestions = difflib.get_close_matches( sci_name.lower(), list(all_sci.keys()), n=1, cutoff=0.8 ) suggested = ( all_sci[suggestions[0]].name if suggestions and suggestions[0] != sci_name.lower() else None ) item = { "uuid": uuid_val, "name": name, "sci_name": sci_name, "suggested": suggested, "plant_type": plant_type, "mother_uuid": mother_uuid } review_list.append(item) session["pending_rows"].append(item) session["review_list"] = review_list return redirect(url_for("utility.review")) # ── Direct Media Upload Flow ─────────────────────────────────────── plugin = request.form.get("plugin", "") related_id = request.form.get("related_id", 0) plant_id = request.form.get("plant_id", None) growlog_id = request.form.get("growlog_id", None) caption = request.form.get("caption", None) now = datetime.utcnow() unique_id = str(uuid.uuid4()).replace("-", "") secure_name= secure_filename(file.filename) storage_path = os.path.join( current_app.config["UPLOAD_FOLDER"], str(current_user.id), now.strftime("%Y/%m/%d") ) os.makedirs(storage_path, exist_ok=True) full_file_path = os.path.join(storage_path, f"{unique_id}_{secure_name}") file.save(full_file_path) file_url = f"/{current_user.id}/{now.strftime('%Y/%m/%d')}/{unique_id}_{secure_name}" media = Media( plugin=plugin, related_id=related_id, filename=f"{unique_id}_{secure_name}", uploaded_at=now, uploader_id=current_user.id, caption=caption, plant_id=plant_id, growlog_id=growlog_id, created_at=now, file_url=file_url ) db.session.add(media) db.session.commit() flash("File uploaded and saved successfully.", "success") return redirect(request.url) return render_template("utility/upload.html", csrf_token=generate_csrf()) @bp.route("/review", methods=["GET", "POST"]) @login_required def review(): rows = session.get("pending_rows", []) review_list = session.get("review_list", []) if request.method == "POST": neo = get_neo4j_handler() added = 0 all_common = {c.name.lower(): c for c in PlantCommonName.query.all()} all_scientific = {s.name.lower(): s for s in PlantScientificName.query.all()} for row in rows: uuid_val = row.get("uuid") name = row.get("name") sci_name = row.get("sci_name") suggested = row.get("suggested") plant_type = row.get("plant_type") mother_uuid = row.get("mother_uuid") accepted = request.form.get(f"confirm_{uuid_val}") common = PlantCommonName.query.filter_by(name=name).first() if not common: common = PlantCommonName(name=name) db.session.add(common) db.session.flush() all_common[common.name.lower()] = common use_name = suggested if (suggested and accepted) else sci_name scientific = PlantScientificName.query.filter_by(name=use_name).first() if not scientific: scientific = PlantScientificName( name = use_name, common_id = common.id ) db.session.add(scientific) db.session.flush() all_scientific[scientific.name.lower()] = scientific verified = not suggested or (suggested and accepted) plant = Plant.query.filter_by(uuid=uuid_val).first() if not plant: plant = Plant( uuid = uuid_val, common_id = common.id, scientific_id = scientific.id, plant_type = plant_type, owner_id = current_user.id, mother_uuid = mother_uuid or None, data_verified = verified ) db.session.add(plant) db.session.flush() log = PlantOwnershipLog( plant_id = plant.id, user_id = current_user.id, date_acquired = datetime.utcnow(), transferred = False, is_verified = verified ) db.session.add(log) added += 1 neo.create_plant_node(plant.uuid, plant.common.name) if mother_uuid: neo.create_lineage(child_uuid=plant.uuid, parent_uuid=mother_uuid) db.session.commit() neo.close() flash(f"{added} plants added (MySQL) and Neo4j updated.", "success") session.pop("pending_rows", None) session.pop("review_list", None) return redirect(url_for("utility.upload")) return render_template( "utility/review.html", review_list=review_list, csrf_token=generate_csrf() ) @bp.route('/export_data', methods=['GET']) @login_required def export_data(): # Unique export identifier export_id = f"{uuid.uuid4()}_{int(datetime.utcnow().timestamp())}" # 1) Gather plants plants = ( Plant.query .filter_by(owner_id=current_user.id) .order_by(Plant.id) .all() ) # Build plants.csv plant_io = io.StringIO() pw = csv.writer(plant_io) pw.writerow([ 'UUID', 'Type', 'Name', 'Scientific Name', 'Vendor Name', 'Price', 'Mother UUID', 'Notes' ]) for p in plants: pw.writerow([ p.uuid, p.plant_type, p.common_name.name if p.common_name else '', p.scientific_name.name if p.scientific_name else '', getattr(p, 'vendor_name', '') or '', getattr(p, 'price', '') or '', p.mother_uuid or '', p.notes or '' ]) plants_csv = plant_io.getvalue() # 2) Gather media media_records = ( Media.query .filter(Media.uploader_id == current_user.id, Media.plant_id.isnot(None)) .order_by(Media.id) .all() ) # Build media.csv media_io = io.StringIO() mw = csv.writer(media_io) mw.writerow([ 'Plant UUID', 'Image Path', 'Uploaded At', 'Source Type' ]) for m in media_records: mw.writerow([ m.plant.uuid, m.file_url, m.uploaded_at.isoformat() if m.uploaded_at else '', m.caption or '' ]) media_csv = media_io.getvalue() # 3) Assemble ZIP with images from UPLOAD_FOLDER zip_buf = io.BytesIO() with zipfile.ZipFile(zip_buf, 'w', zipfile.ZIP_DEFLATED) as zf: meta = ( f"export_id,{export_id}\n" f"user_id,{current_user.id}\n" f"exported_at,{datetime.utcnow().isoformat()}\n" ) zf.writestr('metadata.txt', meta) zf.writestr('plants.csv', plants_csv) zf.writestr('media.csv', media_csv) media_root = current_app.config['UPLOAD_FOLDER'] for m in media_records: rel = m.file_url.split('uploads/', 1)[-1] abs_path = os.path.join(media_root, rel) if os.path.isfile(abs_path): arcname = os.path.join('images', rel) zf.write(abs_path, arcname) zip_buf.seek(0) safe_email = re.sub(r'\W+', '_', current_user.email) filename = f"{safe_email}_export_{export_id}.zip" return send_file( zip_buf, mimetype='application/zip', as_attachment=True, download_name=filename ) # ──────────────────────────────────────────────────────────────────────────────── # QR-Code Generation Helpers & Routes # ──────────────────────────────────────────────────────────────────────────────── def generate_label_with_name(qr_url, name, filename): from PIL import Image, ImageDraw, ImageFont import qrcode from qrcode.image.pil import PilImage from qrcode.constants import ERROR_CORRECT_H from flask import current_app, send_file # Generate QR code qr = qrcode.QRCode(version=2, error_correction=ERROR_CORRECT_H, box_size=10, border=1) qr.add_data(qr_url) qr.make(fit=True) qr_img = qr.make_image(image_factory=PilImage, fill_color="black", back_color="white").convert("RGB") # Create 1.5"x1.5" canvas at 300 DPI dpi = 300 label_px = int(1.5 * dpi) label_img = Image.new("RGB", (label_px, label_px), "white") # Resize QR code qr_size = 350 qr_img = qr_img.resize((qr_size, qr_size), Image.LANCZOS) qr_x = (label_px - qr_size) // 2 label_img.paste(qr_img, (qr_x, 10)) # Load font font_path = os.path.abspath(os.path.join(current_app.root_path, '..', 'font', 'ARIALLGT.TTF')) draw = ImageDraw.Draw(label_img) name = (name or '').strip() font_size = 28 while font_size > 10: try: font = ImageFont.truetype(font_path, font_size) except OSError: font = ImageFont.load_default() if draw.textlength(name, font=font) <= label_px - 20: break font_size -= 1 if draw.textlength(name, font=font) > label_px - 20: while draw.textlength(name + "…", font=font) > label_px - 20 and len(name) > 1: name = name[:-1] name += "…" # Draw text centered text_x = (label_px - draw.textlength(name, font=font)) // 2 text_y = 370 draw.text((text_x, text_y), name, font=font, fill="black") buf = io.BytesIO() label_img.save(buf, format='PNG', dpi=(dpi, dpi)) buf.seek(0) return send_file( buf, mimetype='image/png', as_attachment=True, download_name=filename ) @bp.route('/download_qr/', methods=['GET']) @login_required def download_qr(uuid_val): # Private “Direct QR” → f/ on plant.cards p = Plant.query.filter_by(uuid=uuid_val, owner_id=current_user.id).first_or_404() if not getattr(p, 'short_id', None): p.short_id = Plant.generate_short_id() db.session.commit() base = current_app.config.get('PLANT_CARDS_BASE_URL', 'https://plant.cards') qr_url = f"{base}/f/{p.short_id}" filename = f"{p.short_id}.png" return generate_label_with_name(qr_url, p.common_name.name, filename) @bp.route('/download_qr_card/', methods=['GET']) def download_qr_card(uuid_val): # Public “Card QR” → / on plant.cards p = Plant.query.filter_by(uuid=uuid_val).first_or_404() if not getattr(p, 'short_id', None): p.short_id = Plant.generate_short_id() db.session.commit() base = current_app.config.get('PLANT_CARDS_BASE_URL', 'https://plant.cards') qr_url = f"{base}/{p.short_id}" filename = f"{p.short_id}_card.png" return generate_label_with_name(qr_url, p.common_name.name, filename)