Files
natureinpots_community/plugins/importer/routes.py
2025-06-03 20:46:11 -05:00

175 lines
5.9 KiB
Python

# plugins/importer/routes.py
import csv
import io
import re
import uuid
from flask import (
Blueprint, flash, redirect, render_template, request, url_for
)
from flask_login import login_required, current_user
from sqlalchemy.exc import SQLAlchemyError
from app import db
from plugins.plant.models import (
Plant, PlantCommonName, PlantScientificName, PlantLineage
)
# Blueprint setup
bp = Blueprint(
'importer',
__name__,
template_folder='templates',
url_prefix='/import'
)
# Expected CSV headers (exact, in this order)
EXPECTED_HEADERS = ['uuid', 'plant_type', 'name', 'scientific_name', 'mother_uuid']
# Strict regex for UUIDv4
UUID_REGEX = re.compile(
r'^[0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12}$'
)
def is_valid_uuid(u: str) -> bool:
"""Return True if 'u' matches UUIDv4 format exactly."""
return bool(UUID_REGEX.match(u))
def sanitize_field(field_value: str) -> str:
"""
Prevent CSV-injection by prefixing any cell that starts with
'=', '+', '-', or '@' with a space.
"""
if isinstance(field_value, str) and field_value and field_value[0] in ['=', '+', '-', '@']:
return ' ' + field_value
return field_value
def validate_row(row: dict, line_number: int):
"""
Validate a single CSV row. Returns (cleaned_row_dict, errors_list).
If errors_list is not empty, the row is invalid.
"""
errors = []
cleaned = {}
# 1) uuid: if provided, must be valid; otherwise generate a new one
raw_uuid = row.get('uuid', '').strip()
if raw_uuid:
if not is_valid_uuid(raw_uuid):
errors.append(f"Line {line_number}: Invalid UUID format '{raw_uuid}'.")
else:
cleaned['uuid'] = raw_uuid
else:
# auto-generate
cleaned['uuid'] = str(uuid.uuid4())
# 2) plant_type: required, <= 50 chars
plant_type = row.get('plant_type', '').strip()
if not plant_type:
errors.append(f"Line {line_number}: 'plant_type' is required.")
elif len(plant_type) > 50:
errors.append(f"Line {line_number}: 'plant_type' exceeds 50 characters.")
else:
cleaned['plant_type'] = sanitize_field(plant_type)
# 3) name: required, <= 100 chars
name = row.get('name', '').strip()
if not name:
errors.append(f"Line {line_number}: 'name' (common name) is required.")
elif len(name) > 100:
errors.append(f"Line {line_number}: 'name' exceeds 100 characters.")
else:
cleaned['name'] = sanitize_field(name)
# 4) scientific_name: required, <= 100 chars
sci = row.get('scientific_name', '').strip()
if not sci:
errors.append(f"Line {line_number}: 'scientific_name' is required.")
elif len(sci) > 100:
errors.append(f"Line {line_number}: 'scientific_name' exceeds 100 characters.")
else:
cleaned['scientific_name'] = sanitize_field(sci)
# 5) mother_uuid: optional. If present (and not 'N/A'), must be valid
raw_mother = row.get('mother_uuid', '').strip()
if raw_mother and raw_mother.upper() != 'N/A':
if not is_valid_uuid(raw_mother):
errors.append(f"Line {line_number}: 'mother_uuid' has invalid UUID '{raw_mother}'.")
else:
cleaned['mother_uuid'] = raw_mother
else:
cleaned['mother_uuid'] = None
if errors:
return None, errors
return cleaned, None
@bp.route("/", methods=["GET", "POST"])
@login_required
def upload_csv():
headers = [
"name", "scientific_name", "type", "status", "description"
]
if request.method == "POST":
file = request.files.get("file")
if not file or not file.filename.endswith(".csv"):
flash("Please upload a valid CSV file.", "danger")
return redirect(request.url)
try:
stream = io.StringIO(file.stream.read().decode("UTF-8"))
csv_reader = csv.reader(stream)
data = list(csv_reader)
if not data or data[0] != headers:
flash("CSV headers must exactly match: " + ", ".join(headers), "danger")
return redirect(request.url)
# Skip header row
rows = data[1:]
with db.session.begin_nested(): # rollback safe transaction
for row in rows:
name, sci_name, plant_type, plant_status, desc = row
# Get or create scientific name entry
sci = PlantScientificName.query.filter_by(name=sci_name.strip()).first()
if not sci:
sci = PlantScientificName(name=sci_name.strip())
db.session.add(sci)
db.session.flush() # ensure sci.id is available
# Create plant entry
new_plant = Plant(
uuid=str(uuid4()),
name=name.strip(),
scientific_name_id=sci.id,
type=plant_type.strip(),
status=plant_status.strip(),
description=desc.strip()
)
db.session.add(new_plant)
db.session.flush() # get new_plant.id for ownership log
# Log ownership
log = PlantOwnershipLog(
plant_id=new_plant.id,
user_id=current_user.id,
date_acquired=datetime.utcnow()
)
db.session.add(log)
db.session.commit()
flash(f"Successfully imported {len(rows)} plants.", "success")
return redirect(url_for("importer.upload_csv"))
except Exception as e:
db.session.rollback()
flash(f"Error importing CSV: {str(e)}", "danger")
return redirect(request.url)
return render_template("importer/upload.html", headers=", ".join(headers))