159 lines
6.0 KiB
Python
159 lines
6.0 KiB
Python
import csv
|
|
import io
|
|
import difflib
|
|
from flask import Blueprint, request, render_template, redirect, flash, session, url_for
|
|
from flask_login import login_required, current_user
|
|
from app.neo4j_utils import get_neo4j_handler
|
|
from plugins.plant.models import db, Plant, PlantCommon, PlantScientific
|
|
|
|
bp = Blueprint("importer", __name__, template_folder="templates")
|
|
|
|
REQUIRED_HEADERS = {"uuid", "plant_type", "name"}
|
|
|
|
@bp.route("/import/", methods=["GET", "POST"])
|
|
@login_required
|
|
def upload():
|
|
if request.method == "POST":
|
|
file = request.files.get("file")
|
|
if not file:
|
|
flash("No file uploaded.", "error")
|
|
return redirect(request.url)
|
|
|
|
try:
|
|
decoded = file.read().decode("utf-8-sig")
|
|
stream = io.StringIO(decoded)
|
|
reader = csv.DictReader(stream)
|
|
|
|
headers = set(reader.fieldnames)
|
|
if not REQUIRED_HEADERS.issubset(headers):
|
|
flash(f"Missing required CSV headers: {REQUIRED_HEADERS - headers}", "error")
|
|
return redirect(request.url)
|
|
|
|
session["pending_rows"] = []
|
|
review_list = []
|
|
|
|
all_common = {c.name.lower(): c for c in PlantCommon.query.all()}
|
|
all_scientific = {s.name.lower(): s for s in PlantScientific.query.all()}
|
|
|
|
for row in reader:
|
|
uuid = row.get("uuid")
|
|
name = row.get("name", "").strip()
|
|
sci_name = row.get("scientific_name", "").strip()
|
|
plant_type = row.get("plant_type", "plant")
|
|
mother_uuid = row.get("mother_uuid", "").strip()
|
|
|
|
if not all([uuid, name, plant_type]):
|
|
continue
|
|
|
|
name_lc = name.lower()
|
|
sci_lc = sci_name.lower()
|
|
suggested_match = None
|
|
original_input = sci_name
|
|
|
|
# Fuzzy match scientific name
|
|
if sci_lc and sci_lc not in all_scientific:
|
|
close = difflib.get_close_matches(sci_lc, all_scientific.keys(), n=1, cutoff=0.85)
|
|
if close:
|
|
suggested_match = all_scientific[close[0]].name
|
|
|
|
# Infer from common name
|
|
if not sci_lc and name_lc in all_common:
|
|
sci_obj = PlantScientific.query.filter_by(common_id=all_common[name_lc].id).first()
|
|
if sci_obj:
|
|
sci_name = sci_obj.name
|
|
elif not sci_lc:
|
|
close_common = difflib.get_close_matches(name_lc, all_common.keys(), n=1, cutoff=0.85)
|
|
if close_common:
|
|
match_name = close_common[0]
|
|
sci_obj = PlantScientific.query.filter_by(common_id=all_common[match_name].id).first()
|
|
if sci_obj:
|
|
suggested_match = sci_obj.name
|
|
sci_name = sci_obj.name
|
|
|
|
session["pending_rows"].append({
|
|
"uuid": uuid,
|
|
"name": name,
|
|
"sci_name": sci_name,
|
|
"original_sci_name": original_input,
|
|
"plant_type": plant_type,
|
|
"mother_uuid": mother_uuid,
|
|
"suggested_scientific_name": suggested_match,
|
|
})
|
|
|
|
if suggested_match and suggested_match != original_input:
|
|
review_list.append({
|
|
"uuid": uuid,
|
|
"common_name": name,
|
|
"user_input": original_input or "(blank)",
|
|
"suggested_name": suggested_match
|
|
})
|
|
|
|
session["review_list"] = review_list
|
|
return redirect(url_for("importer.review"))
|
|
|
|
except Exception as e:
|
|
flash(f"Import failed: {str(e)}", "error")
|
|
|
|
return render_template("importer/upload.html")
|
|
|
|
|
|
@bp.route("/import/review", methods=["GET", "POST"])
|
|
@login_required
|
|
def review():
|
|
rows = session.get("pending_rows", [])
|
|
review_list = session.get("review_list", [])
|
|
if request.method == "POST":
|
|
neo = get_neo4j_handler()
|
|
added = 0
|
|
|
|
for row in rows:
|
|
uuid = row["uuid"]
|
|
name = row["name"]
|
|
sci_name = row["sci_name"]
|
|
user_input = row["original_sci_name"]
|
|
plant_type = row["plant_type"]
|
|
mother_uuid = row["mother_uuid"]
|
|
suggested = row.get("suggested_scientific_name")
|
|
|
|
common = PlantCommon.query.filter_by(name=name).first()
|
|
if not common:
|
|
common = PlantCommon(name=name)
|
|
db.session.add(common)
|
|
db.session.flush()
|
|
|
|
accepted = request.form.get(f"confirm_{uuid}")
|
|
sci_name_to_use = suggested if (suggested and accepted) else sci_name
|
|
|
|
scientific = PlantScientific.query.filter_by(name=sci_name_to_use).first()
|
|
if not scientific:
|
|
scientific = PlantScientific(name=sci_name_to_use, common_id=common.id)
|
|
db.session.add(scientific)
|
|
db.session.flush()
|
|
|
|
plant = Plant.query.filter_by(uuid=uuid).first()
|
|
if not plant:
|
|
plant = Plant(
|
|
uuid=uuid,
|
|
common_id=common.id,
|
|
scientific_id=scientific.id,
|
|
plant_type=plant_type,
|
|
owner_id=current_user.id,
|
|
is_verified=bool(accepted)
|
|
)
|
|
db.session.add(plant)
|
|
added += 1
|
|
|
|
neo.create_plant_node(uuid, name)
|
|
if mother_uuid:
|
|
neo.create_plant_node(mother_uuid, "Parent")
|
|
neo.create_lineage(uuid, mother_uuid)
|
|
|
|
db.session.commit()
|
|
neo.close()
|
|
flash(f"{added} plants added.", "success")
|
|
session.pop("pending_rows", None)
|
|
session.pop("review_list", None)
|
|
return redirect(url_for("importer.upload"))
|
|
|
|
return render_template("importer/review.html", review_list=review_list)
|