Files
natureinpots_community/plugins/utility/routes.py
2025-06-23 04:08:45 -05:00

595 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# plugins/utility/routes.py
# Standard library
import csv
import io
import os
import re
import uuid
import zipfile
import tempfile
import difflib
import traceback
from datetime import datetime
# Thirdparty
from flask import (
Blueprint, request, render_template, redirect, flash,
session, url_for, send_file, current_app
)
from flask_login import login_required, current_user
from flask_wtf.csrf import generate_csrf
from werkzeug.utils import secure_filename
from werkzeug.datastructures import FileStorage
import qrcode
from PIL import Image, ImageDraw, ImageFont
from qrcode.image.pil import PilImage
from qrcode.constants import ERROR_CORRECT_H
# Application
from app import db
from app.neo4j_utils import get_neo4j_handler
# Plugins
from plugins.plant.models import (
Plant,
PlantCommonName,
PlantScientificName,
PlantOwnershipLog,
)
from plugins.media.models import Media
from plugins.media.routes import _process_upload_file
from plugins.utility.models import ImportBatch
bp = Blueprint(
'utility',
__name__,
template_folder='templates',
url_prefix='/utility'
)
@bp.route("/", methods=["GET"])
@login_required
def index():
return redirect(url_for("utility.upload"))
# ────────────────────────────────────────────────────────────────────────────────
# Required headers for your sub-app export ZIP
PLANT_HEADERS = [
"UUID","Type","Name","Scientific Name",
"Vendor Name","Price","Mother UUID","Notes"
]
MEDIA_HEADERS = [
"Plant UUID","Image Path","Uploaded At","Source Type"
]
# Headers for standalone CSV review flow
REQUIRED_HEADERS = {"uuid", "plant_type", "name", "scientific_name", "mother_uuid"}
@bp.route("/upload", methods=["GET", "POST"])
@login_required
def upload():
if request.method == "POST":
file = request.files.get("file")
if not file or not file.filename:
flash("No file selected", "error")
return redirect(request.url)
filename = file.filename.lower().strip()
# ── ZIP Import Flow ────────────────────────────────────────────────
if filename.endswith(".zip"):
tmp_zip = tempfile.NamedTemporaryFile(delete=False, suffix=".zip")
file.save(tmp_zip.name)
tmp_zip.close()
try:
z = zipfile.ZipFile(tmp_zip.name)
except zipfile.BadZipFile:
os.remove(tmp_zip.name)
flash("Uploaded file is not a valid ZIP.", "danger")
return redirect(request.url)
names = z.namelist()
if "plants.csv" not in names or "media.csv" not in names:
os.remove(tmp_zip.name)
flash("ZIP must contain both plants.csv and media.csv", "danger")
return redirect(request.url)
export_id = None
if "metadata.txt" in names:
meta = z.read("metadata.txt").decode("utf-8", "ignore")
for line in meta.splitlines():
if line.startswith("export_id,"):
export_id = line.split(",", 1)[1].strip()
break
if not export_id:
os.remove(tmp_zip.name)
flash("metadata.txt missing or missing export_id", "danger")
return redirect(request.url)
if ImportBatch.query.filter_by(export_id=export_id, user_id=current_user.id).first():
os.remove(tmp_zip.name)
flash("This export has already been imported.", "info")
return redirect(request.url)
batch = ImportBatch(
export_id=export_id,
user_id=current_user.id,
imported_at=datetime.utcnow()
)
db.session.add(batch)
db.session.commit()
tmpdir = tempfile.mkdtemp()
z.extractall(tmpdir)
# --- load and validate plants.csv ---
plant_path = os.path.join(tmpdir, "plants.csv")
with open(plant_path, newline="", encoding="utf-8-sig") as pf:
reader = csv.DictReader(pf)
if reader.fieldnames != PLANT_HEADERS:
missing = set(PLANT_HEADERS) - set(reader.fieldnames or [])
extra = set(reader.fieldnames or []) - set(PLANT_HEADERS)
os.remove(tmp_zip.name)
flash(f"plants.csv header mismatch. Missing: {missing}, Extra: {extra}", "danger")
return redirect(request.url)
plant_rows = list(reader)
# --- load and validate media.csv ---
media_path = os.path.join(tmpdir, "media.csv")
with open(media_path, newline="", encoding="utf-8-sig") as mf:
mreader = csv.DictReader(mf)
if mreader.fieldnames != MEDIA_HEADERS:
missing = set(MEDIA_HEADERS) - set(mreader.fieldnames or [])
extra = set(mreader.fieldnames or []) - set(MEDIA_HEADERS)
os.remove(tmp_zip.name)
flash(f"media.csv header mismatch. Missing: {missing}, Extra: {extra}", "danger")
return redirect(request.url)
media_rows = list(mreader)
# --- import plants ---
neo = get_neo4j_handler()
added_plants = 0
plant_map = {}
for row in plant_rows:
common = PlantCommonName.query.filter_by(name=row["Name"]).first()
if not common:
common = PlantCommonName(name=row["Name"])
db.session.add(common)
db.session.flush()
scientific = PlantScientificName.query.filter_by(name=row["Scientific Name"]).first()
if not scientific:
scientific = PlantScientificName(
name=row["Scientific Name"],
common_id=common.id
)
db.session.add(scientific)
db.session.flush()
p = Plant(
uuid=row["UUID"],
common_id=common.id,
scientific_id=scientific.id,
plant_type=row["Type"],
owner_id=current_user.id,
data_verified=True
)
db.session.add(p)
db.session.flush()
plant_map[p.uuid] = p.id
log = PlantOwnershipLog(
plant_id=p.id,
user_id=current_user.id,
date_acquired=datetime.utcnow(),
transferred=False,
is_verified=True
)
db.session.add(log)
neo.create_plant_node(p.uuid, row["Name"])
if row.get("Mother UUID"):
neo.create_lineage(
child_uuid=p.uuid,
parent_uuid=row["Mother UUID"]
)
added_plants += 1
# --- import media (FIX: now passing plant_id) ---
added_media = 0
for mrow in media_rows:
plant_uuid = mrow["Plant UUID"]
plant_id = plant_map.get(plant_uuid)
if not plant_id:
continue
subpath = mrow["Image Path"].split('uploads/', 1)[-1]
src = os.path.join(tmpdir, "images", subpath)
if not os.path.isfile(src):
continue
try:
with open(src, "rb") as f:
file_storage = FileStorage(
stream=io.BytesIO(f.read()),
filename=os.path.basename(subpath),
content_type='image/jpeg'
)
media = _process_upload_file(
file=file_storage,
uploader_id=current_user.id,
plugin="plant",
related_id=plant_id,
plant_id=plant_id # ← ensure the FK is set!
)
media.uploaded_at = datetime.fromisoformat(mrow["Uploaded At"])
media.caption = mrow["Source Type"]
db.session.add(media)
added_media += 1
except Exception as e:
current_app.logger.warning(
f"Failed to import media file: {subpath}{e}"
)
current_app.logger.debug(traceback.format_exc())
db.session.commit()
neo.close()
os.remove(tmp_zip.name)
flash(f"Imported {added_plants} plants and {added_media} images.", "success")
return redirect(request.url)
# ── CSV Review Flow ─────────────────────────────────────────────────
if filename.endswith(".csv"):
try:
stream = io.StringIO(file.stream.read().decode("utf-8-sig"))
reader = csv.DictReader(stream)
except Exception:
flash("Failed to read CSV file. Ensure it is valid UTF-8.", "error")
return redirect(request.url)
headers = set(reader.fieldnames or [])
missing = REQUIRED_HEADERS - headers
if missing:
flash(f"Missing required CSV headers: {missing}", "error")
return redirect(request.url)
session["pending_rows"] = []
review_list = []
all_common = {c.name.lower(): c for c in PlantCommonName.query.all()}
all_sci = {s.name.lower(): s for s in PlantScientificName.query.all()}
for row in reader:
uuid_val = row.get("uuid", "").strip().strip('"')
name = row.get("name", "").strip()
sci_name = row.get("scientific_name", "").strip()
plant_type = row.get("plant_type", "").strip() or "plant"
mother_uuid= row.get("mother_uuid", "").strip().strip('"')
if not (uuid_val and name and plant_type):
continue
suggestions = difflib.get_close_matches(
sci_name.lower(),
list(all_sci.keys()),
n=1,
cutoff=0.8
)
suggested = (
all_sci[suggestions[0]].name
if suggestions and suggestions[0] != sci_name.lower()
else None
)
item = {
"uuid": uuid_val,
"name": name,
"sci_name": sci_name,
"suggested": suggested,
"plant_type": plant_type,
"mother_uuid": mother_uuid
}
review_list.append(item)
session["pending_rows"].append(item)
session["review_list"] = review_list
return redirect(url_for("utility.review"))
# ── Direct Media Upload Flow ───────────────────────────────────────
plugin = request.form.get("plugin", '')
related_id = request.form.get("related_id", 0)
plant_id = request.form.get("plant_id", None)
growlog_id = request.form.get("growlog_id", None)
caption = request.form.get("caption", None)
now = datetime.utcnow()
unique_id = str(uuid.uuid4()).replace("-", "")
secure_name= secure_filename(file.filename)
storage_path = os.path.join(
current_app.config['UPLOAD_FOLDER'],
str(current_user.id),
now.strftime('%Y/%m/%d')
)
os.makedirs(storage_path, exist_ok=True)
full_file_path = os.path.join(storage_path, f"{unique_id}_{secure_name}")
file.save(full_file_path)
file_url = f"/{current_user.id}/{now.strftime('%Y/%m/%d')}/{unique_id}_{secure_name}"
media = Media(
plugin=plugin,
related_id=related_id,
filename=f"{unique_id}_{secure_name}",
uploaded_at=now,
uploader_id=current_user.id,
caption=caption,
plant_id=plant_id,
growlog_id=growlog_id,
created_at=now,
file_url=file_url
)
db.session.add(media)
db.session.commit()
flash("File uploaded and saved successfully.", "success")
return redirect(request.url)
return render_template("utility/upload.html", csrf_token=generate_csrf())
@bp.route("/review", methods=["GET", "POST"])
@login_required
def review():
rows = session.get("pending_rows", [])
review_list = session.get("review_list", [])
if request.method == "POST":
neo = get_neo4j_handler()
added = 0
all_common = {c.name.lower(): c for c in PlantCommonName.query.all()}
all_scientific = {s.name.lower(): s for s in PlantScientificName.query.all()}
for row in rows:
uuid_val = row.get("uuid")
name = row.get("name")
sci_name = row.get("sci_name")
suggested = row.get("suggested")
plant_type = row.get("plant_type")
mother_uuid = row.get("mother_uuid")
accepted = request.form.get(f"confirm_{uuid_val}")
common = PlantCommonName.query.filter_by(name=name).first()
if not common:
common = PlantCommonName(name=name)
db.session.add(common)
db.session.flush()
all_common[common.name.lower()] = common
use_name = suggested if (suggested and accepted) else sci_name
scientific = PlantScientificName.query.filter_by(name=use_name).first()
if not scientific:
scientific = PlantScientificName(
name = use_name,
common_id = common.id
)
db.session.add(scientific)
db.session.flush()
all_scientific[scientific.name.lower()] = scientific
verified = not suggested or (suggested and accepted)
plant = Plant.query.filter_by(uuid=uuid_val).first()
if not plant:
plant = Plant(
uuid = uuid_val,
common_id = common.id,
scientific_id = scientific.id,
plant_type = plant_type,
owner_id = current_user.id,
data_verified = verified
)
db.session.add(plant)
db.session.flush()
log = PlantOwnershipLog(
plant_id = plant.id,
user_id = current_user.id,
date_acquired = datetime.utcnow(),
transferred = False,
is_verified = verified
)
db.session.add(log)
added += 1
neo.create_plant_node(plant.uuid, plant.common.name)
if mother_uuid:
neo.create_lineage(child_uuid=plant.uuid, parent_uuid=mother_uuid)
db.session.commit()
neo.close()
flash(f"{added} plants added (MySQL) and Neo4j updated.", "success")
session.pop("pending_rows", None)
session.pop("review_list", None)
return redirect(url_for("utility.upload"))
return render_template(
"utility/review.html",
review_list=review_list,
csrf_token=generate_csrf()
)
@bp.route('/export_data', methods=['GET'])
@login_required
def export_data():
# Unique export identifier
export_id = f"{uuid.uuid4()}_{int(datetime.utcnow().timestamp())}"
# 1) Gather plants
plants = (
Plant.query
.filter_by(owner_id=current_user.id)
.order_by(Plant.id)
.all()
)
# Build plants.csv
plant_io = io.StringIO()
pw = csv.writer(plant_io)
pw.writerow([
'UUID', 'Type', 'Name', 'Scientific Name',
'Vendor Name', 'Price', 'Mother UUID', 'Notes'
])
for p in plants:
pw.writerow([
p.uuid,
p.plant_type,
p.common_name.name if p.common_name else '',
p.scientific_name.name if p.scientific_name else '',
getattr(p, 'vendor_name', '') or '',
getattr(p, 'price', '') or '',
p.mother_uuid or '',
p.notes or ''
])
plants_csv = plant_io.getvalue()
# 2) Gather media
media_records = (
Media.query
.filter(Media.uploader_id == current_user.id, Media.plant_id.isnot(None))
.order_by(Media.id)
.all()
)
# Build media.csv
media_io = io.StringIO()
mw = csv.writer(media_io)
mw.writerow([
'Plant UUID', 'Image Path', 'Uploaded At', 'Source Type'
])
for m in media_records:
mw.writerow([
m.plant.uuid,
m.file_url,
m.uploaded_at.isoformat() if m.uploaded_at else '',
m.caption or ''
])
media_csv = media_io.getvalue()
# 3) Assemble ZIP with images from UPLOAD_FOLDER
zip_buf = io.BytesIO()
with zipfile.ZipFile(zip_buf, 'w', zipfile.ZIP_DEFLATED) as zf:
meta = (
f"export_id,{export_id}\n"
f"user_id,{current_user.id}\n"
f"exported_at,{datetime.utcnow().isoformat()}\n"
)
zf.writestr('metadata.txt', meta)
zf.writestr('plants.csv', plants_csv)
zf.writestr('media.csv', media_csv)
media_root = current_app.config['UPLOAD_FOLDER']
for m in media_records:
rel = m.file_url.split('uploads/', 1)[-1]
abs_path = os.path.join(media_root, rel)
if os.path.isfile(abs_path):
arcname = os.path.join('images', rel)
zf.write(abs_path, arcname)
zip_buf.seek(0)
safe_email = re.sub(r'\W+', '_', current_user.email)
filename = f"{safe_email}_export_{export_id}.zip"
return send_file(
zip_buf,
mimetype='application/zip',
as_attachment=True,
download_name=filename
)
# ────────────────────────────────────────────────────────────────────────────────
# QR-Code Generation Helpers & Routes
# ────────────────────────────────────────────────────────────────────────────────
def generate_label_with_name(qr_url, name, download_filename):
"""
Build a 1.5"x1.5" PNG (300dpi) with a QR code
and the plant name underneath.
"""
qr = qrcode.QRCode(version=2, error_correction=ERROR_CORRECT_H, box_size=10, border=1)
qr.add_data(qr_url)
qr.make(fit=True)
qr_img = qr.make_image(image_factory=PilImage, fill_color="black", back_color="white").convert("RGB")
dpi = 300
label_px = int(1.5 * dpi)
canvas_h = label_px + 400
label_img = Image.new("RGB", (label_px, canvas_h), "white")
label_img.paste(qr_img.resize((label_px, label_px)), (0, 0))
font_path = os.path.join(current_app.root_path, '..', 'font', 'ARIALLGT.TTF')
draw = ImageDraw.Draw(label_img)
text = (name or '').strip()
font_size = 28
while font_size > 10:
try:
font = ImageFont.truetype(font_path, font_size)
except OSError:
font = ImageFont.load_default()
if draw.textlength(text, font=font) <= label_px - 20:
break
font_size -= 1
while draw.textlength(text, font=font) > label_px - 20 and len(text) > 1:
text = text[:-1]
if len(text) < len((name or '').strip()):
text += ""
x = (label_px - draw.textlength(text, font=font)) // 2
y = label_px + 20
draw.text((x, y), text, font=font, fill="black")
buf = io.BytesIO()
label_img.save(buf, format='PNG', dpi=(dpi, dpi))
buf.seek(0)
return send_file(buf, mimetype='image/png', download_name=download_filename, as_attachment=True)
@bp.route('/<uuid:uuid_val>/download_qr', methods=['GET'])
def download_qr(uuid_val):
p = Plant.query.filter_by(uuid=uuid_val).first_or_404()
if not p.short_id:
p.short_id = Plant.generate_short_id()
db.session.commit()
qr_url = f'https://plant.cards/{p.short_id}'
filename = f"{p.short_id}.png"
return generate_label_with_name(
qr_url,
p.common_name.name or p.scientific_name,
filename
)
@bp.route('/<uuid:uuid_val>/download_qr_card', methods=['GET'])
def download_qr_card(uuid_val):
p = Plant.query.filter_by(uuid=uuid_val).first_or_404()
if not p.short_id:
p.short_id = Plant.generate_short_id()
db.session.commit()
qr_url = f'https://plant.cards/{p.short_id}'
filename = f"{p.short_id}_card.png"
return generate_label_with_name(
qr_url,
p.common_name.name or p.scientific_name,
filename
)