Files
familienarchiv/tools/import-normalizer/normalize.py
Marcel 8cac63e938
All checks were successful
CI / Unit & Component Tests (pull_request) Successful in 3m32s
CI / OCR Service Tests (pull_request) Successful in 19s
CI / Backend Unit Tests (pull_request) Successful in 3m26s
CI / fail2ban Regex (pull_request) Successful in 47s
CI / Semgrep Security Scan (pull_request) Successful in 21s
CI / Compose Bucket Idempotency (pull_request) Successful in 1m0s
feat(normalizer): drop unmatched-names.csv; unresolved-names is the names report
The unmatched list was just non-family correspondents (expected noise);
their count stays in summary.txt and they remain in canonical-persons.xlsx.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-25 16:46:08 +02:00

159 lines
7.4 KiB
Python

"""Orchestrator: read raw workbooks -> canonical outputs + review reports."""
import argparse
from collections import Counter
from pathlib import Path
import config
import ingest
import persons
import documents
import overrides as overrides_mod
import writers
def run(*, document_workbook, document_sheet, person_workbook, person_sheet,
out_dir, review_dir, date_overrides, name_overrides) -> dict:
out_dir, review_dir = Path(out_dir), Path(review_dir)
# --- persons ---
person_rows = ingest.read_sheet(person_workbook, person_sheet)
p_fields, _ = ingest.build_header_map(person_rows[0], config.PERSON_HEADER_MAP, config.PERSON_REQUIRED_FIELDS)
person_dicts = [{f: (row[i] if i < len(row) else "") for f, i in p_fields.items()} for row in person_rows[1:]]
register = persons.parse_register(person_dicts)
alias_index = persons.AliasIndex(register)
given_names = persons.build_given_names(register, config.EXTRA_GIVEN_NAMES)
ctx = persons.ResolutionContext(alias_index, name_overrides, given_names=given_names)
# --- documents ---
doc_rows = ingest.read_sheet(document_workbook, document_sheet)
d_fields, unknown_headers = ingest.build_header_map(doc_rows[0], config.DOCUMENT_HEADER_MAP, config.DOCUMENT_REQUIRED_FIELDS)
index_col = d_fields["index"]
canon_docs, blank_index, skipped_x, mismatches = [], [], [], []
unparsed_by_raw: dict[str, list] = {}
dates_by_override = 0
empty_count = 0
seen_index = Counter()
for source_row, cells in enumerate(doc_rows[1:], start=2):
t = documents.triage(cells, index_col)
if t is documents.Triage.EMPTY:
empty_count += 1
continue
if t is documents.Triage.BLANK_INDEX:
blank_index.append([source_row, documents.classify_blank_index(cells, d_fields),
" | ".join(c for c in cells if c)])
continue
if t is documents.Triage.X_SUFFIX:
idx = (cells[index_col] or "").strip()
skipped_x.append([source_row, idx, idx[:-1]])
continue
raw = documents.extract_row(cells, d_fields, source_row)
seen_index[raw.index] += 1
if raw.date.strip() and raw.date.strip() in date_overrides:
dates_by_override += 1
doc = documents.to_canonical(raw, ctx, date_overrides)
if "unparsed_date" in doc.needs_review:
unparsed_by_raw.setdefault(raw.date, []).append(source_row)
if "index_file_mismatch" in doc.needs_review:
mismatches.append([source_row, raw.index, raw.file])
canon_docs.append(doc)
# REQ-TRIAGE-01: flag EVERY occurrence of a duplicated index and report all of them.
dup_indexes = {idx for idx, n in seen_index.items() if n > 1}
duplicates = []
for doc in canon_docs:
if doc.index in dup_indexes:
if "duplicate_index" not in doc.needs_review:
doc.needs_review.append("duplicate_index")
duplicates.append([doc.source_row, doc.index])
all_people = register + list(ctx.provisional.values())
# --- write canonical outputs ---
writers.write_documents_xlsx(canon_docs, out_dir / "canonical-documents.xlsx")
writers.write_persons_xlsx(all_people, out_dir / "canonical-persons.xlsx")
# --- review files ---
# unparsed dates: most-frequent first, with example source rows + blank override cells so a
# corrected row can be pasted straight into overrides/dates.csv (same raw,iso,precision shape).
unparsed_rows = sorted(
([raw, len(rows), " ".join(map(str, rows[:5])), "", ""] for raw, rows in unparsed_by_raw.items()),
key=lambda r: (-r[1], r[0]))
writers.write_review_csv(review_dir / "unparsed-dates.csv",
["raw", "count", "example_rows", "suggested_iso", "suggested_precision"], unparsed_rows)
writers.write_review_csv(review_dir / "duplicate-index.csv", ["source_row", "index"], duplicates)
writers.write_review_csv(review_dir / "blank-index-rows.csv", ["source_row", "kind", "content"], blank_index)
writers.write_review_csv(review_dir / "skipped-x-suffix.csv", ["source_row", "index", "base_index"], skipped_x)
unresolved_agg: dict[tuple, list] = {}
for name, category, row in ctx.unresolved:
unresolved_agg.setdefault((category, name), []).append(row)
unresolved_rows = sorted(
([cat, name, len(rows), " ".join(map(str, sorted(rows)[:5]))]
for (cat, name), rows in unresolved_agg.items()),
key=lambda r: (r[0], -r[2], r[1]))
writers.write_review_csv(review_dir / "unresolved-names.csv",
["category", "raw", "count", "example_rows"], unresolved_rows)
writers.write_review_csv(review_dir / "index-file-mismatch.csv", ["source_row", "index", "file"], mismatches)
dated = sum(1 for d in canon_docs if d.date_raw.strip())
unknown = sum(1 for d in canon_docs if d.date_raw.strip() and d.date_precision == "UNKNOWN")
unknown_rate = f"{(100 * unknown / dated):.1f}%" if dated else "0.0%"
stats = {
"# INPUTS": "",
"document_rows_read": len(doc_rows) - 1,
"register_persons": len(register),
"unknown_headers": ", ".join(unknown_headers) or "(none)",
"# OUTPUTS": "",
"documents_emitted": len(canon_docs),
"provisional_persons": len(ctx.provisional),
"# DATES": "",
"dated_rows": dated,
"unparsed_dates": unknown,
"unknown_date_rate": f"{unknown_rate} (target <=5%)",
"distinct_unparsed_formats": len(unparsed_by_raw),
"# NAMES": "",
"unmatched_name_strings": len(ctx.unmatched),
"unresolved_name_occurrences": len(ctx.unresolved),
"unresolved_unknown": sum(1 for _, c, _ in ctx.unresolved if c == "unknown"),
"unresolved_single_token": sum(1 for _, c, _ in ctx.unresolved if c == "single_token"),
"unresolved_relational": sum(1 for _, c, _ in ctx.unresolved if c == "relational"),
"unresolved_collective": sum(1 for _, c, _ in ctx.unresolved if c == "collective"),
"unresolved_prose": sum(1 for _, c, _ in ctx.unresolved if c == "prose"),
"unresolved_ambiguous_pair": sum(1 for _, c, _ in ctx.unresolved if c == "ambiguous_pair"),
"# ANOMALIES": "",
"empty_rows": empty_count,
"blank_index_rows": len(blank_index),
"skipped_x_suffix": len(skipped_x),
"duplicate_index_rows": len(duplicates),
"index_file_mismatches": len(mismatches),
"# OVERRIDES": "",
"date_overrides_loaded": len(date_overrides),
"name_overrides_loaded": len(name_overrides),
"dates_resolved_by_override": dates_by_override,
"names_resolved_by_override": ctx.override_hits,
}
writers.write_summary(review_dir / "summary.txt", stats)
return stats
def main():
parser = argparse.ArgumentParser(description="Normalize the family archive spreadsheets.")
parser.parse_args()
date_overrides, name_overrides = overrides_mod.load_overrides(
config.OVERRIDES_DIR / "dates.csv", config.OVERRIDES_DIR / "names.csv")
stats = run(
document_workbook=config.DOCUMENT_WORKBOOK, document_sheet=config.DOCUMENT_SHEET,
person_workbook=config.PERSON_WORKBOOK, person_sheet=config.PERSON_SHEET,
out_dir=config.OUT_DIR, review_dir=config.REVIEW_DIR,
date_overrides=date_overrides, name_overrides=name_overrides)
print("Normalization complete:")
for k, v in stats.items():
print(f" {k}: {v}")
if __name__ == "__main__":
main()