Files
natureinpots_community/plugins/utility/routes.py

508 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# File: plugins/utility/routes.py
# Standard library
import csv
import io
import os
import re
import uuid
import zipfile
import tempfile
import difflib
import traceback
from datetime import datetime
# Thirdparty
from flask import (
Blueprint, request, render_template, redirect, flash,
session, url_for, send_file, current_app
)
from flask_login import login_required, current_user
from flask_wtf.csrf import generate_csrf
from werkzeug.utils import secure_filename
from werkzeug.datastructures import FileStorage
# Application
from app import db
from app.neo4j_utils import get_neo4j_handler
# Plugins
from plugins.plant.models import (
Plant,
PlantCommonName,
PlantScientificName,
PlantOwnershipLog,
)
from plugins.media.models import Media
from plugins.utility.models import ImportBatch
from plugins.media.routes import _process_upload_file
bp = Blueprint(
'utility',
__name__,
template_folder='templates',
url_prefix='/utility'
)
@bp.route("/", methods=["GET"])
@login_required
def index():
return redirect(url_for("utility.upload"))
@bp.route("/imports", methods=["GET"])
@login_required
def imports():
batches = (
ImportBatch.query
.filter_by(user_id=current_user.id)
.order_by(ImportBatch.imported_at.desc())
.limit(20)
.all()
)
return render_template("utility/imports.html", batches=batches)
# ────────────────────────────────────────────────────────────────────────────────
PLANT_HEADERS = [
"UUID","Type","Name","Scientific Name",
"Vendor Name","Price","Mother UUID","Notes",
"Short ID"
]
MEDIA_HEADERS = [
"Plant UUID","Image Path","Uploaded At","Source Type"
]
REQUIRED_HEADERS = {"uuid", "plant_type", "name", "scientific_name", "mother_uuid"}
@bp.route("/upload", methods=["GET", "POST"])
@login_required
def upload():
# -------------------------------------------------------------------------
# POST handling
# -------------------------------------------------------------------------
if request.method == "POST":
file = request.files.get("file")
if not file or not file.filename:
flash("No file selected", "error")
return redirect(request.url)
filename = file.filename.lower().strip()
# ── ZIP Import Flow ────────────────────────────────────────────────
if filename.endswith(".zip"):
# 1) Save to shared UPLOAD_FOLDER
upload_dir = current_app.config["UPLOAD_FOLDER"]
os.makedirs(upload_dir, exist_ok=True)
tmp_path = os.path.join(upload_dir, f"tmp_{uuid.uuid4().hex}.zip")
file.save(tmp_path)
# 2) Validate ZIP + contents
try:
zf = zipfile.ZipFile(tmp_path)
except zipfile.BadZipFile:
os.remove(tmp_path)
flash("Uploaded file is not a valid ZIP.", "danger")
return redirect(request.url)
names = set(zf.namelist())
if not {"plants.csv", "media.csv"}.issubset(names):
os.remove(tmp_path)
flash("ZIP must contain both plants.csv and media.csv", "danger")
return redirect(request.url)
# 3) Extract export_id from metadata.txt
export_id = None
if "metadata.txt" in names:
meta = zf.read("metadata.txt").decode("utf-8", "ignore")
for line in meta.splitlines():
if line.startswith("export_id,"):
export_id = line.split(",", 1)[1].strip()
break
if not export_id:
os.remove(tmp_path)
flash("metadata.txt missing or missing export_id", "danger")
return redirect(request.url)
# 4) Prevent duplicate imports
if ImportBatch.query.filter_by(export_id=export_id, user_id=current_user.id).first():
os.remove(tmp_path)
flash("This export has already been imported.", "info")
return redirect(request.url)
# 5) Create batch record
batch = ImportBatch(
export_id = export_id,
user_id = current_user.id,
imported_at = datetime.utcnow(),
status = 'pending'
)
db.session.add(batch)
db.session.commit()
# 6) Rename ZIP to include batch.id
final_path = os.path.join(upload_dir, f"{batch.id}_{uuid.uuid4().hex}.zip")
os.rename(tmp_path, final_path)
# 7) Enqueue Celery task
from plugins.utility.tasks import import_text_data
import_text_data.delay(final_path, "zip", batch.id)
flash("ZIP received; import queued in background.", "success")
return redirect(request.url)
# ── CSV Review Flow ─────────────────────────────────────────────────
if filename.endswith(".csv"):
try:
stream = io.StringIO(file.stream.read().decode("utf-8-sig"))
reader = csv.DictReader(stream)
except Exception:
flash("Failed to read CSV file. Ensure it is valid UTF-8.", "error")
return redirect(request.url)
headers = set(reader.fieldnames or [])
missing = REQUIRED_HEADERS - headers
if missing:
flash(f"Missing required CSV headers: {missing}", "error")
return redirect(request.url)
session["pending_rows"] = []
review_list = []
all_common = {c.name.lower(): c for c in PlantCommonName.query.all()}
all_sci = {s.name.lower(): s for s in PlantScientificName.query.all()}
for row in reader:
uuid_val = row.get("uuid", "").strip().strip('"')
name = row.get("name", "").strip()
sci_name = row.get("scientific_name", "").strip()
plant_type = row.get("plant_type", "").strip() or "plant"
mother_uuid = row.get("mother_uuid", "").strip().strip('"')
if not (uuid_val and name and plant_type):
continue
suggestions = difflib.get_close_matches(
sci_name.lower(), list(all_sci.keys()),
n=1, cutoff=0.8
)
suggested = None
if suggestions and suggestions[0] != sci_name.lower():
suggested = all_sci[suggestions[0]].name
item = {
"uuid" : uuid_val,
"name" : name,
"sci_name" : sci_name,
"suggested" : suggested,
"plant_type" : plant_type,
"mother_uuid" : mother_uuid
}
review_list.append(item)
session["pending_rows"].append(item)
session["review_list"] = review_list
return redirect(url_for("utility.review"))
# ── Direct Media Upload Flow ───────────────────────────────────────
plugin = request.form.get("plugin", "")
related_id = request.form.get("related_id", 0)
plant_id = request.form.get("plant_id", None)
growlog_id = request.form.get("growlog_id", None)
caption = request.form.get("caption", None)
now = datetime.utcnow()
unique_id = uuid.uuid4().hex
secure_name = secure_filename(file.filename)
storage_path= os.path.join(
current_app.config["UPLOAD_FOLDER"],
str(current_user.id),
now.strftime("%Y/%m/%d")
)
os.makedirs(storage_path, exist_ok=True)
unique_name = f"{unique_id}_{secure_name}"
full_path = os.path.join(storage_path, unique_name)
file.save(full_path)
file_url = f"/{current_user.id}/{now.strftime('%Y/%m/%d')}/{unique_name}"
media = Media(
plugin = plugin,
related_id = related_id,
filename = unique_name,
uploaded_at = now,
uploader_id = current_user.id,
caption = caption,
plant_id = plant_id,
growlog_id = growlog_id,
created_at = now,
file_url = file_url
)
db.session.add(media)
db.session.commit()
flash("File uploaded and saved successfully.", "success")
return redirect(request.url)
# -------------------------------------------------------------------------
# GET → render form
# -------------------------------------------------------------------------
return render_template("utility/upload.html", csrf_token=generate_csrf())
@bp.route("/review", methods=["GET", "POST"])
@login_required
def review():
rows = session.get("pending_rows", [])
review_list = session.get("review_list", [])
if request.method == "POST":
neo = get_neo4j_handler()
added = 0
all_common = {c.name.lower(): c for c in PlantCommonName.query.all()}
all_scientific = {s.name.lower(): s for s in PlantScientificName.query.all()}
for row in rows:
uuid_val = row["uuid"]
name = row["name"]
sci_name = row["sci_name"]
suggested = row["suggested"]
plant_type = row["plant_type"]
mother_uuid = row["mother_uuid"]
accepted = request.form.get(f"confirm_{uuid_val}") == "yes"
# handle names
common = PlantCommonName.query.filter_by(name=name).first()
if not common:
common = PlantCommonName(name=name)
db.session.add(common)
db.session.flush()
all_common[common.name.lower()] = common
use_name = suggested if (suggested and accepted) else sci_name
scientific = PlantScientificName.query.filter_by(name=use_name).first()
if not scientific:
scientific = PlantScientificName(
name = use_name,
common_id = common.id
)
db.session.add(scientific)
db.session.flush()
all_scientific[scientific.name.lower()] = scientific
verified = not suggested or (suggested and accepted)
plant = Plant.query.filter_by(uuid=uuid_val).first()
if not plant:
plant = Plant(
uuid = uuid_val,
common_id = common.id,
scientific_id = scientific.id,
plant_type = plant_type,
owner_id = current_user.id,
mother_uuid = mother_uuid or None,
data_verified = verified
)
db.session.add(plant)
db.session.flush()
log = PlantOwnershipLog(
plant_id = plant.id,
user_id = current_user.id,
date_acquired = datetime.utcnow(),
transferred = False,
is_verified = verified
)
db.session.add(log)
added += 1
neo.create_plant_node(plant.uuid, plant.common.name)
if mother_uuid:
neo.create_lineage(child_uuid=plant.uuid, parent_uuid=mother_uuid)
db.session.commit()
neo.close()
flash(f"{added} plants added (MySQL) and Neo4j updated.", "success")
session.pop("pending_rows", None)
session.pop("review_list", None)
return redirect(url_for("utility.upload"))
return render_template(
"utility/review.html",
review_list=review_list,
csrf_token=generate_csrf()
)
@bp.route('/export_data', methods=['GET'])
@login_required
def export_data():
export_id = f"{uuid.uuid4()}_{int(datetime.utcnow().timestamp())}"
plants = (
Plant.query.filter_by(owner_id=current_user.id)
.order_by(Plant.id).all()
)
# build plants.csv
plant_io = io.StringIO()
pw = csv.writer(plant_io)
pw.writerow([
'UUID','Type','Name','Scientific Name',
'Vendor Name','Price','Mother UUID','Notes'
])
for p in plants:
pw.writerow([
p.uuid,
p.plant_type,
p.common_name.name if p.common_name else '',
p.scientific_name.name if p.scientific_name else '',
getattr(p, 'vendor_name','') or '',
getattr(p, 'price','') or '',
p.mother_uuid or '',
p.notes or ''
])
plants_csv = plant_io.getvalue()
# build media.csv
media_records = (
Media.query.filter(
Media.uploader_id==current_user.id,
Media.plant_id.isnot(None)
).order_by(Media.id).all()
)
media_io = io.StringIO()
mw = csv.writer(media_io)
mw.writerow(['Plant UUID','Image Path','Uploaded At','Source Type'])
for m in media_records:
mw.writerow([
m.plant.uuid,
m.file_url,
m.uploaded_at.isoformat() if m.uploaded_at else '',
m.caption or ''
])
media_csv = media_io.getvalue()
# assemble ZIP
zip_buf = io.BytesIO()
with zipfile.ZipFile(zip_buf,'w',zipfile.ZIP_DEFLATED) as zf:
meta = (
f"export_id,{export_id}\n"
f"user_id,{current_user.id}\n"
f"exported_at,{datetime.utcnow().isoformat()}\n"
)
zf.writestr('metadata.txt', meta)
zf.writestr('plants.csv', plants_csv)
zf.writestr('media.csv', media_csv)
media_root = current_app.config['UPLOAD_FOLDER']
for m in media_records:
rel = m.file_url.split('uploads/',1)[-1]
abs_path = os.path.join(media_root, rel)
if os.path.isfile(abs_path):
arcname = os.path.join('images', rel)
zf.write(abs_path, arcname)
zip_buf.seek(0)
safe_email = re.sub(r'\W+','_',current_user.email)
filename = f"{safe_email}_export_{export_id}.zip"
return send_file(
zip_buf,
mimetype='application/zip',
as_attachment=True,
download_name=filename
)
def generate_label_with_name(qr_url, name, filename):
from PIL import Image, ImageDraw, ImageFont
import qrcode
from qrcode.image.pil import PilImage
from qrcode.constants import ERROR_CORRECT_H
from flask import send_file
qr = qrcode.QRCode(
version=2,
error_correction=ERROR_CORRECT_H,
box_size=10,
border=1
)
qr.add_data(qr_url)
qr.make(fit=True)
qr_img = qr.make_image(
image_factory=PilImage,
fill_color="black",
back_color="white"
).convert("RGB")
dpi = 300
label_px = int(1.5 * dpi)
label_img= Image.new("RGB", (label_px, label_px), "white")
qr_size = 350
qr_img = qr_img.resize((qr_size, qr_size), Image.LANCZOS)
qr_x = (label_px - qr_size) // 2
label_img.paste(qr_img, (qr_x, 10))
font_path= os.path.abspath(
os.path.join(
current_app.root_path, '..', 'font', 'ARIALLGT.TTF'
)
)
draw = ImageDraw.Draw(label_img)
name = (name or '').strip()
font_size = 28
while font_size > 10:
try:
font = ImageFont.truetype(font_path, font_size)
except OSError:
font = ImageFont.load_default()
if draw.textlength(name, font=font) <= label_px - 20:
break
font_size -= 1
if draw.textlength(name, font=font) > label_px - 20:
while draw.textlength(name + "", font=font) > label_px - 20 and len(name) > 1:
name = name[:-1]
name += ""
text_x = (label_px - draw.textlength(name, font=font)) // 2
text_y = 370
draw.text((text_x, text_y), name, font=font, fill="black")
buf = io.BytesIO()
label_img.save(buf, format='PNG', dpi=(dpi, dpi))
buf.seek(0)
return send_file(
buf,
mimetype='image/png',
as_attachment=True,
download_name=filename
)
@bp.route('/download_qr/<string:uuid_val>', methods=['GET'])
@login_required
def download_qr(uuid_val):
p = Plant.query.filter_by(uuid=uuid_val, owner_id=current_user.id).first_or_404()
if not p.short_id:
p.short_id = Plant.generate_short_id()
db.session.commit()
base = current_app.config.get('PLANT_CARDS_BASE_URL', 'https://plant.cards')
qr_url = f"{base}/f/{p.short_id}"
filename = f"{p.short_id}.png"
return generate_label_with_name(qr_url, p.common_name.name, filename)
@bp.route('/download_qr_card/<string:uuid_val>', methods=['GET'])
def download_qr_card(uuid_val):
p = Plant.query.filter_by(uuid=uuid_val).first_or_404()
if not p.short_id:
p.short_id = Plant.generate_short_id()
db.session.commit()
base = current_app.config.get('PLANT_CARDS_BASE_URL', 'https://plant.cards')
qr_url = f"{base}/{p.short_id}"
filename = f"{p.short_id}_card.png"
return generate_label_with_name(qr_url, p.common_name.name, filename)