Files
natureinpots_community/plugins/utility/routes.py
2025-07-09 01:05:45 -05:00

659 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# File: plugins/utility/routes.py
# Standard library
import csv
import io
import os
import re
import uuid
import zipfile
import tempfile
import difflib
import traceback
from datetime import datetime
# Thirdparty
from flask import (
Blueprint, request, render_template, redirect, flash,
session, url_for, send_file, current_app
)
from flask_login import login_required, current_user
from flask_wtf.csrf import generate_csrf
from werkzeug.utils import secure_filename
from werkzeug.datastructures import FileStorage
# Application
from app import db
from app.neo4j_utils import get_neo4j_handler
# Plugins
from plugins.plant.models import (
Plant,
PlantCommonName,
PlantScientificName,
PlantOwnershipLog,
)
from plugins.media.models import Media
from plugins.utility.models import ImportBatch
from plugins.utility.tasks import import_text_data
bp = Blueprint(
'utility',
__name__,
template_folder='templates',
url_prefix='/utility'
)
@bp.route("/", methods=["GET"])
@login_required
def index():
return redirect(url_for("utility.upload"))
@bp.route("/imports", methods=["GET"])
@login_required
def imports():
batches = (
ImportBatch.query
.filter_by(user_id=current_user.id)
.order_by(ImportBatch.imported_at.desc())
.limit(20)
.all()
)
return render_template("utility/imports.html", batches=batches)
# ────────────────────────────────────────────────────────────────────────────────
PLANT_HEADERS = [
"UUID","Type","Name","Scientific Name",
"Vendor Name","Price","Mother UUID","Notes",
"Short ID"
]
MEDIA_HEADERS = [
"Plant UUID","Image Path","Uploaded At","Source Type"
]
REQUIRED_HEADERS = {"uuid", "plant_type", "name", "scientific_name", "mother_uuid"}
@bp.route("/upload", methods=["GET", "POST"])
@login_required
def upload():
if request.method == "POST":
file = request.files.get("file")
if not file or not file.filename:
flash("No file selected", "error")
return redirect(request.url)
filename = file.filename.lower().strip()
# ── ZIP Import Flow ────────────────────────────────────────────────
if filename.endswith(".zip"):
tmp_zip = tempfile.NamedTemporaryFile(delete=False, suffix=".zip")
file.save(tmp_zip.name)
tmp_zip.close()
# validate ZIP
try:
z = zipfile.ZipFile(tmp_zip.name)
except zipfile.BadZipFile:
os.remove(tmp_zip.name)
flash("Uploaded file is not a valid ZIP.", "danger")
return redirect(request.url)
names = z.namelist()
if "plants.csv" not in names or "media.csv" not in names:
os.remove(tmp_zip.name)
flash("ZIP must contain both plants.csv and media.csv", "danger")
return redirect(request.url)
# extract export_id from metadata
export_id = None
if "metadata.txt" in names:
meta = z.read("metadata.txt").decode("utf-8", "ignore")
for line in meta.splitlines():
if line.startswith("export_id,"):
export_id = line.split(",", 1)[1].strip()
break
if not export_id:
os.remove(tmp_zip.name)
flash("metadata.txt missing or missing export_id", "danger")
return redirect(request.url)
# prevent duplicates
if ImportBatch.query.filter_by(export_id=export_id, user_id=current_user.id).first():
os.remove(tmp_zip.name)
flash("This export has already been imported.", "info")
return redirect(request.url)
# record batch
batch = ImportBatch(
export_id = export_id,
user_id = current_user.id,
imported_at = datetime.utcnow(),
status = 'pending'
)
db.session.add(batch)
db.session.commit()
# hand off to Celery
try:
import_text_data.delay(tmp_zip.name, "zip", batch.id)
flash("ZIP received; import queued in background.", "success")
return redirect(request.url)
except Exception:
current_app.logger.exception("Failed to enqueue import_text_data")
flash("Failed to queue import job; falling back to inline import", "warning")
# ── Fallback: inline import ─────────────────────────────────────────
tmpdir = tempfile.mkdtemp()
z.extractall(tmpdir)
# load plants.csv
plant_path = os.path.join(tmpdir, "plants.csv")
with open(plant_path, newline="", encoding="utf-8-sig") as pf:
reader = csv.DictReader(pf)
if reader.fieldnames != PLANT_HEADERS:
missing = set(PLANT_HEADERS) - set(reader.fieldnames or [])
extra = set(reader.fieldnames or []) - set(PLANT_HEADERS)
os.remove(tmp_zip.name)
flash(f"plants.csv header mismatch. Missing: {missing}, Extra: {extra}", "danger")
return redirect(request.url)
plant_rows = list(reader)
# load media.csv
media_path = os.path.join(tmpdir, "media.csv")
with open(media_path, newline="", encoding="utf-8-sig") as mf:
mreader = csv.DictReader(mf)
if mreader.fieldnames != MEDIA_HEADERS:
missing = set(MEDIA_HEADERS) - set(mreader.fieldnames or [])
extra = set(mreader.fieldnames or []) - set(MEDIA_HEADERS)
os.remove(tmp_zip.name)
flash(f"media.csv header mismatch. Missing: {missing}, Extra: {extra}", "danger")
return redirect(request.url)
media_rows = list(mreader)
# import plants
neo = get_neo4j_handler()
plant_map = {}
added_plants = 0
for row in plant_rows:
# common name
common = PlantCommonName.query.filter_by(name=row["Name"]).first()
if not common:
common = PlantCommonName(name=row["Name"])
db.session.add(common)
db.session.flush()
# scientific name
scientific = PlantScientificName.query.filter_by(name=row["Scientific Name"]).first()
if not scientific:
scientific = PlantScientificName(
name = row["Scientific Name"],
common_id = common.id
)
db.session.add(scientific)
db.session.flush()
raw_mu = row.get("Mother UUID") or None
mu_for_insert= raw_mu if raw_mu in plant_map else None
p = Plant(
uuid = row["UUID"],
common_id = common.id,
scientific_id = scientific.id,
plant_type = row["Type"],
owner_id = current_user.id,
vendor_name = row["Vendor Name"] or None,
price = float(row["Price"]) if row["Price"] else None,
mother_uuid = mu_for_insert,
notes = row["Notes"] or None,
short_id = row.get("Short ID") or None,
data_verified = True
)
db.session.add(p)
db.session.flush()
plant_map[p.uuid] = p.id
log = PlantOwnershipLog(
plant_id = p.id,
user_id = current_user.id,
date_acquired = datetime.utcnow(),
transferred = False,
is_verified = True
)
db.session.add(log)
neo.create_plant_node(p.uuid, row["Name"])
if raw_mu:
neo.create_lineage(child_uuid=p.uuid, parent_uuid=raw_mu)
added_plants += 1
db.session.commit()
# backfill mothers
for row in plant_rows:
if row.get("Mother UUID"):
Plant.query.filter_by(uuid=row["UUID"]).update({
'mother_uuid': row["Mother UUID"]
})
db.session.commit()
# import media images
added_media = 0
for mrow in media_rows:
puuid = mrow["Plant UUID"]
pid = plant_map.get(puuid)
if not pid:
continue
subpath = mrow["Image Path"].split('uploads/', 1)[-1]
src = os.path.join(tmpdir, "images", subpath)
if not os.path.isfile(src):
continue
try:
# build FileStorage for convenience
with open(src, "rb") as f:
fs = FileStorage(
stream = io.BytesIO(f.read()),
filename = os.path.basename(subpath),
content_type='image/jpeg'
)
# now save to our UPLOAD_FOLDER
now = datetime.utcnow()
secure_name = secure_filename(fs.filename)
storage_dir = os.path.join(
current_app.config["UPLOAD_FOLDER"],
str(current_user.id),
now.strftime("%Y/%m/%d")
)
os.makedirs(storage_dir, exist_ok=True)
unique_name = f"{uuid.uuid4().hex}_{secure_name}"
full_path = os.path.join(storage_dir, unique_name)
fs.save(full_path)
file_url = f"/{current_user.id}/{now.strftime('%Y/%m/%d')}/{unique_name}"
media = Media(
plugin = "plant",
related_id = pid,
filename = unique_name,
uploaded_at = datetime.fromisoformat(mrow["Uploaded At"]),
uploader_id = current_user.id,
caption = mrow["Source Type"],
plant_id = pid,
created_at = datetime.fromisoformat(mrow["Uploaded At"]),
file_url = file_url
)
db.session.add(media)
added_media += 1
except Exception as e:
current_app.logger.warning(f"Failed to import media file: {subpath}{e}")
current_app.logger.debug(traceback.format_exc())
db.session.commit()
neo.close()
os.remove(tmp_zip.name)
flash(f"Imported {added_plants} plants and {added_media} images.", "success")
return redirect(request.url)
# ── CSV Review Flow ─────────────────────────────────────────────────
if filename.endswith(".csv"):
try:
stream = io.StringIO(file.stream.read().decode("utf-8-sig"))
reader = csv.DictReader(stream)
except Exception:
flash("Failed to read CSV file. Ensure it is valid UTF-8.", "error")
return redirect(request.url)
headers = set(reader.fieldnames or [])
missing = REQUIRED_HEADERS - headers
if missing:
flash(f"Missing required CSV headers: {missing}", "error")
return redirect(request.url)
session["pending_rows"] = []
review_list = []
all_common = {c.name.lower(): c for c in PlantCommonName.query.all()}
all_sci = {s.name.lower(): s for s in PlantScientificName.query.all()}
for row in reader:
uuid_val = row.get("uuid", "").strip().strip('"')
name = row.get("name", "").strip()
sci_name = row.get("scientific_name", "").strip()
plant_type = row.get("plant_type", "").strip() or "plant"
mother_uuid = row.get("mother_uuid", "").strip().strip('"')
if not (uuid_val and name and plant_type):
continue
suggestions = difflib.get_close_matches(
sci_name.lower(), list(all_sci.keys()),
n=1, cutoff=0.8
)
suggested = None
if suggestions and suggestions[0] != sci_name.lower():
suggested = all_sci[suggestions[0]].name
item = {
"uuid": uuid_val,
"name": name,
"sci_name": sci_name,
"suggested": suggested,
"plant_type": plant_type,
"mother_uuid": mother_uuid
}
review_list.append(item)
session["pending_rows"].append(item)
session["review_list"] = review_list
return redirect(url_for("utility.review"))
# ── Direct Media Upload Flow ───────────────────────────────────────
plugin = request.form.get("plugin", "")
related_id = request.form.get("related_id", 0)
plant_id = request.form.get("plant_id", None)
growlog_id = request.form.get("growlog_id", None)
caption = request.form.get("caption", None)
now = datetime.utcnow()
unique_id = uuid.uuid4().hex
secure_name = secure_filename(file.filename)
storage_path= os.path.join(
current_app.config["UPLOAD_FOLDER"],
str(current_user.id),
now.strftime("%Y/%m/%d")
)
os.makedirs(storage_path, exist_ok=True)
unique_name = f"{unique_id}_{secure_name}"
full_path = os.path.join(storage_path, unique_name)
file.save(full_path)
file_url = f"/{current_user.id}/{now.strftime('%Y/%m/%d')}/{unique_name}"
media = Media(
plugin = plugin,
related_id = related_id,
filename = unique_name,
uploaded_at = now,
uploader_id = current_user.id,
caption = caption,
plant_id = plant_id,
growlog_id = growlog_id,
created_at = now,
file_url = file_url
)
db.session.add(media)
db.session.commit()
flash("File uploaded and saved successfully.", "success")
return redirect(request.url)
return render_template("utility/upload.html", csrf_token=generate_csrf())
@bp.route("/review", methods=["GET", "POST"])
@login_required
def review():
rows = session.get("pending_rows", [])
review_list = session.get("review_list", [])
if request.method == "POST":
neo = get_neo4j_handler()
added = 0
all_common = {c.name.lower(): c for c in PlantCommonName.query.all()}
all_scientific = {s.name.lower(): s for s in PlantScientificName.query.all()}
for row in rows:
uuid_val = row["uuid"]
name = row["name"]
sci_name = row["sci_name"]
suggested = row["suggested"]
plant_type = row["plant_type"]
mother_uuid = row["mother_uuid"]
accepted = request.form.get(f"confirm_{uuid_val}") == "yes"
# handle names
common = PlantCommonName.query.filter_by(name=name).first()
if not common:
common = PlantCommonName(name=name)
db.session.add(common)
db.session.flush()
all_common[common.name.lower()] = common
use_name = suggested if (suggested and accepted) else sci_name
scientific = PlantScientificName.query.filter_by(name=use_name).first()
if not scientific:
scientific = PlantScientificName(
name = use_name,
common_id = common.id
)
db.session.add(scientific)
db.session.flush()
all_scientific[scientific.name.lower()] = scientific
verified = not suggested or (suggested and accepted)
plant = Plant.query.filter_by(uuid=uuid_val).first()
if not plant:
plant = Plant(
uuid = uuid_val,
common_id = common.id,
scientific_id = scientific.id,
plant_type = plant_type,
owner_id = current_user.id,
mother_uuid = mother_uuid or None,
data_verified = verified
)
db.session.add(plant)
db.session.flush()
log = PlantOwnershipLog(
plant_id = plant.id,
user_id = current_user.id,
date_acquired = datetime.utcnow(),
transferred = False,
is_verified = verified
)
db.session.add(log)
added += 1
neo.create_plant_node(plant.uuid, plant.common.name)
if mother_uuid:
neo.create_lineage(child_uuid=plant.uuid, parent_uuid=mother_uuid)
db.session.commit()
neo.close()
flash(f"{added} plants added (MySQL) and Neo4j updated.", "success")
session.pop("pending_rows", None)
session.pop("review_list", None)
return redirect(url_for("utility.upload"))
return render_template(
"utility/review.html",
review_list=review_list,
csrf_token=generate_csrf()
)
@bp.route('/export_data', methods=['GET'])
@login_required
def export_data():
export_id = f"{uuid.uuid4()}_{int(datetime.utcnow().timestamp())}"
plants = (
Plant.query.filter_by(owner_id=current_user.id)
.order_by(Plant.id).all()
)
# build plants.csv
plant_io = io.StringIO()
pw = csv.writer(plant_io)
pw.writerow([
'UUID','Type','Name','Scientific Name',
'Vendor Name','Price','Mother UUID','Notes'
])
for p in plants:
pw.writerow([
p.uuid,
p.plant_type,
p.common_name.name if p.common_name else '',
p.scientific_name.name if p.scientific_name else '',
getattr(p, 'vendor_name','') or '',
getattr(p, 'price','') or '',
p.mother_uuid or '',
p.notes or ''
])
plants_csv = plant_io.getvalue()
# build media.csv
media_records = (
Media.query.filter(
Media.uploader_id==current_user.id,
Media.plant_id.isnot(None)
).order_by(Media.id).all()
)
media_io = io.StringIO()
mw = csv.writer(media_io)
mw.writerow(['Plant UUID','Image Path','Uploaded At','Source Type'])
for m in media_records:
mw.writerow([
m.plant.uuid,
m.file_url,
m.uploaded_at.isoformat() if m.uploaded_at else '',
m.caption or ''
])
media_csv = media_io.getvalue()
# assemble ZIP
zip_buf = io.BytesIO()
with zipfile.ZipFile(zip_buf,'w',zipfile.ZIP_DEFLATED) as zf:
meta = (
f"export_id,{export_id}\n"
f"user_id,{current_user.id}\n"
f"exported_at,{datetime.utcnow().isoformat()}\n"
)
zf.writestr('metadata.txt', meta)
zf.writestr('plants.csv', plants_csv)
zf.writestr('media.csv', media_csv)
media_root = current_app.config['UPLOAD_FOLDER']
for m in media_records:
rel = m.file_url.split('uploads/',1)[-1]
abs_path = os.path.join(media_root, rel)
if os.path.isfile(abs_path):
arcname = os.path.join('images', rel)
zf.write(abs_path, arcname)
zip_buf.seek(0)
safe_email = re.sub(r'\W+','_',current_user.email)
filename = f"{safe_email}_export_{export_id}.zip"
return send_file(
zip_buf,
mimetype='application/zip',
as_attachment=True,
download_name=filename
)
def generate_label_with_name(qr_url, name, filename):
from PIL import Image, ImageDraw, ImageFont
import qrcode
from qrcode.image.pil import PilImage
from qrcode.constants import ERROR_CORRECT_H
from flask import send_file
qr = qrcode.QRCode(
version=2,
error_correction=ERROR_CORRECT_H,
box_size=10,
border=1
)
qr.add_data(qr_url)
qr.make(fit=True)
qr_img = qr.make_image(
image_factory=PilImage,
fill_color="black",
back_color="white"
).convert("RGB")
dpi = 300
label_px = int(1.5 * dpi)
label_img= Image.new("RGB", (label_px, label_px), "white")
qr_size = 350
qr_img = qr_img.resize((qr_size, qr_size), Image.LANCZOS)
qr_x = (label_px - qr_size) // 2
label_img.paste(qr_img, (qr_x, 10))
font_path= os.path.abspath(
os.path.join(
current_app.root_path, '..', 'font', 'ARIALLGT.TTF'
)
)
draw = ImageDraw.Draw(label_img)
name = (name or '').strip()
font_size = 28
while font_size > 10:
try:
font = ImageFont.truetype(font_path, font_size)
except OSError:
font = ImageFont.load_default()
if draw.textlength(name, font=font) <= label_px - 20:
break
font_size -= 1
if draw.textlength(name, font=font) > label_px - 20:
while draw.textlength(name + "", font=font) > label_px - 20 and len(name) > 1:
name = name[:-1]
name += ""
text_x = (label_px - draw.textlength(name, font=font)) // 2
text_y = 370
draw.text((text_x, text_y), name, font=font, fill="black")
buf = io.BytesIO()
label_img.save(buf, format='PNG', dpi=(dpi, dpi))
buf.seek(0)
return send_file(
buf,
mimetype='image/png',
as_attachment=True,
download_name=filename
)
@bp.route('/download_qr/<string:uuid_val>', methods=['GET'])
@login_required
def download_qr(uuid_val):
p = Plant.query.filter_by(uuid=uuid_val, owner_id=current_user.id).first_or_404()
if not p.short_id:
p.short_id = Plant.generate_short_id()
db.session.commit()
base = current_app.config.get('PLANT_CARDS_BASE_URL', 'https://plant.cards')
qr_url = f"{base}/f/{p.short_id}"
filename = f"{p.short_id}.png"
return generate_label_with_name(qr_url, p.common_name.name, filename)
@bp.route('/download_qr_card/<string:uuid_val>', methods=['GET'])
def download_qr_card(uuid_val):
p = Plant.query.filter_by(uuid=uuid_val).first_or_404()
if not p.short_id:
p.short_id = Plant.generate_short_id()
db.session.commit()
base = current_app.config.get('PLANT_CARDS_BASE_URL', 'https://plant.cards')
qr_url = f"{base}/{p.short_id}"
filename = f"{p.short_id}_card.png"
return generate_label_with_name(qr_url, p.common_name.name, filename)