From 18d5a1e2da0d8a483cd68871be68d23522ad6c6e Mon Sep 17 00:00:00 2001 From: Marcel Date: Mon, 25 May 2026 14:46:13 +0200 Subject: [PATCH] feat(normalizer): orchestrator + end-to-end integration test Co-Authored-By: Claude Opus 4.7 --- tools/import-normalizer/normalize.py | 151 ++++++++++++++++++ .../import-normalizer/tests/test_normalize.py | 59 +++++++ 2 files changed, 210 insertions(+) create mode 100644 tools/import-normalizer/normalize.py create mode 100644 tools/import-normalizer/tests/test_normalize.py diff --git a/tools/import-normalizer/normalize.py b/tools/import-normalizer/normalize.py new file mode 100644 index 00000000..cabbe45c --- /dev/null +++ b/tools/import-normalizer/normalize.py @@ -0,0 +1,151 @@ +"""Orchestrator: read raw workbooks -> canonical outputs + review reports.""" +import argparse +from collections import Counter +from pathlib import Path + +import config +import ingest +import persons +import documents +import overrides as overrides_mod +import writers + + +def run(*, document_workbook, document_sheet, person_workbook, person_sheet, + out_dir, review_dir, date_overrides, name_overrides) -> dict: + out_dir, review_dir = Path(out_dir), Path(review_dir) + + # --- persons --- + person_rows = ingest.read_sheet(person_workbook, person_sheet) + p_fields, _ = ingest.build_header_map(person_rows[0], config.PERSON_HEADER_MAP, config.PERSON_REQUIRED_FIELDS) + person_dicts = [{f: (row[i] if i < len(row) else "") for f, i in p_fields.items()} for row in person_rows[1:]] + register = persons.parse_register(person_dicts) + alias_index = persons.AliasIndex(register) + ctx = persons.ResolutionContext(alias_index, name_overrides) + + # --- documents --- + doc_rows = ingest.read_sheet(document_workbook, document_sheet) + d_fields, unknown_headers = ingest.build_header_map(doc_rows[0], config.DOCUMENT_HEADER_MAP, config.DOCUMENT_REQUIRED_FIELDS) + index_col = d_fields["index"] + + canon_docs, blank_index, skipped_x, mismatches = [], [], [], [] + unparsed_by_raw: dict[str, list] = {} + dates_by_override = 0 + empty_count = 0 + seen_index = Counter() + + for source_row, cells in enumerate(doc_rows[1:], start=2): + t = documents.triage(cells, index_col) + if t is documents.Triage.EMPTY: + empty_count += 1 + continue + if t is documents.Triage.BLANK_INDEX: + blank_index.append([source_row, documents.classify_blank_index(cells, d_fields), + " | ".join(c for c in cells if c)]) + continue + if t is documents.Triage.X_SUFFIX: + idx = (cells[index_col] or "").strip() + skipped_x.append([source_row, idx, idx[:-1]]) + continue + raw = documents.extract_row(cells, d_fields, source_row) + seen_index[raw.index] += 1 + if raw.date.strip() and raw.date.strip() in date_overrides: + dates_by_override += 1 + doc = documents.to_canonical(raw, ctx, date_overrides) + if "unparsed_date" in doc.needs_review: + unparsed_by_raw.setdefault(raw.date, []).append(source_row) + if "index_file_mismatch" in doc.needs_review: + mismatches.append([source_row, raw.index, raw.file]) + canon_docs.append(doc) + + # REQ-TRIAGE-01: flag EVERY occurrence of a duplicated index and report all of them. + dup_indexes = {idx for idx, n in seen_index.items() if n > 1} + duplicates = [] + for doc in canon_docs: + if doc.index in dup_indexes: + if "duplicate_index" not in doc.needs_review: + doc.needs_review.append("duplicate_index") + duplicates.append([doc.source_row, doc.index]) + + all_people = register + list(ctx.provisional.values()) + + # --- write canonical outputs --- + writers.write_documents_xlsx(canon_docs, out_dir / "canonical-documents.xlsx") + writers.write_persons_xlsx(all_people, out_dir / "canonical-persons.xlsx") + + # --- review files --- + # unparsed dates: most-frequent first, with example source rows + blank override cells so a + # corrected row can be pasted straight into overrides/dates.csv (same raw,iso,precision shape). + unparsed_rows = sorted( + ([raw, len(rows), " ".join(map(str, rows[:5])), "", ""] for raw, rows in unparsed_by_raw.items()), + key=lambda r: (-r[1], r[0])) + writers.write_review_csv(review_dir / "unparsed-dates.csv", + ["raw", "count", "example_rows", "suggested_iso", "suggested_precision"], unparsed_rows) + + unmatched_rows = [] + for name, rows in sorted(ctx.unmatched.items()): + sid, score = alias_index.suggest(name) + unmatched_rows.append([name, len(rows), " ".join(map(str, rows[:5])), + sid or "", f"{score:.2f}" if sid else ""]) + writers.write_review_csv(review_dir / "unmatched-names.csv", + ["raw", "count", "example_rows", "suggested_id", "suggested_score"], unmatched_rows) + + writers.write_review_csv(review_dir / "duplicate-index.csv", ["source_row", "index"], duplicates) + writers.write_review_csv(review_dir / "blank-index-rows.csv", ["source_row", "kind", "content"], blank_index) + writers.write_review_csv(review_dir / "skipped-x-suffix.csv", ["source_row", "index", "base_index"], skipped_x) + writers.write_review_csv(review_dir / "ambiguous-receivers.csv", ["raw", "part", "source_row"], ctx.ambiguous) + writers.write_review_csv(review_dir / "index-file-mismatch.csv", ["source_row", "index", "file"], mismatches) + + dated = sum(1 for d in canon_docs if d.date_raw.strip()) + unknown = sum(1 for d in canon_docs if d.date_raw.strip() and d.date_precision == "UNKNOWN") + unknown_rate = f"{(100 * unknown / dated):.1f}%" if dated else "0.0%" + + stats = { + "# INPUTS": "", + "document_rows_read": len(doc_rows) - 1, + "register_persons": len(register), + "unknown_headers": ", ".join(unknown_headers) or "(none)", + "# OUTPUTS": "", + "documents_emitted": len(canon_docs), + "provisional_persons": len(ctx.provisional), + "# DATES": "", + "dated_rows": dated, + "unparsed_dates": unknown, + "unknown_date_rate": f"{unknown_rate} (target <=5%)", + "distinct_unparsed_formats": len(unparsed_by_raw), + "# NAMES": "", + "unmatched_name_strings": len(ctx.unmatched), + "ambiguous_receivers": len(ctx.ambiguous), + "# ANOMALIES": "", + "empty_rows": empty_count, + "blank_index_rows": len(blank_index), + "skipped_x_suffix": len(skipped_x), + "duplicate_index_rows": len(duplicates), + "index_file_mismatches": len(mismatches), + "# OVERRIDES": "", + "date_overrides_loaded": len(date_overrides), + "name_overrides_loaded": len(name_overrides), + "dates_resolved_by_override": dates_by_override, + "names_resolved_by_override": ctx.override_hits, + } + writers.write_summary(review_dir / "summary.txt", stats) + return stats + + +def main(): + parser = argparse.ArgumentParser(description="Normalize the family archive spreadsheets.") + parser.parse_args() + date_overrides, name_overrides = overrides_mod.load_overrides( + config.OVERRIDES_DIR / "dates.csv", config.OVERRIDES_DIR / "names.csv") + stats = run( + document_workbook=config.DOCUMENT_WORKBOOK, document_sheet=config.DOCUMENT_SHEET, + person_workbook=config.PERSON_WORKBOOK, person_sheet=config.PERSON_SHEET, + out_dir=config.OUT_DIR, review_dir=config.REVIEW_DIR, + date_overrides=date_overrides, name_overrides=name_overrides) + print("Normalization complete:") + for k, v in stats.items(): + print(f" {k}: {v}") + + +if __name__ == "__main__": + main() diff --git a/tools/import-normalizer/tests/test_normalize.py b/tools/import-normalizer/tests/test_normalize.py new file mode 100644 index 00000000..2fd26f29 --- /dev/null +++ b/tools/import-normalizer/tests/test_normalize.py @@ -0,0 +1,59 @@ +import openpyxl +import normalize + + +def _doc_wb(tmp_path): + wb = openpyxl.Workbook(); ws = wb.active; ws.title = "Familienarchiv" + ws.append(["Index", "Datei", "Box", "Mappe", "BriefeschreiberIn", "EmpfängerIn", + "Datum des Briefes", "Ort", "Schlagwort", "Inhalt"]) + ws.append(["W-0001", r"..\__scan\W-0001.pdf", "V", "1", "Walter de Gruyter", + "Eugenie Müller", "15.2.1888", "Rotterdam", "Brautbriefe", "Geschäftsreise"]) + ws.append(["W-0001x", r"..\__scan\W-0001x.pdf", "", "", "Walter de Gruyter", "Eugenie Müller", "", "", "", ""]) + ws.append(["", "", "", "", "Section banner row", "", "", "", "", ""]) + ws.append(["C-0001", "", "", "", "Hans Wittkopf", "", "Freitag 1919", "", "", ""]) + ws.append(["W-0001", r"..\__scan\W-0001.pdf", "V", "1", "Walter de Gruyter", + "Eugenie Müller", "15.2.1888", "Rotterdam", "Brautbriefe", "dup"]) + p = tmp_path / "docs.xlsx"; wb.save(p); return p + + +def _person_wb(tmp_path): + wb = openpyxl.Workbook(); ws = wb.active; ws.title = "Tabelle1" + ws.append(["Generation", "Familienname", "Vorname", "geb als", "Geburtsdatum", + "Geburtsort", "Todesdatum", "Sterbeort", "verheiratet mit", "Bemerkung"]) + ws.append(["G 1", "de Gruyter", "Walter", "", "", "", "", "", "", ""]) + ws.append(["G 1", "de Gruyter", "Eugenie", "Müller", "", "", "", "", "", ""]) + p = tmp_path / "persons.xlsx"; wb.save(p); return p + + +def test_run_end_to_end(tmp_path): + out_dir = tmp_path / "out"; review_dir = tmp_path / "review" + stats = normalize.run( + document_workbook=_doc_wb(tmp_path), document_sheet="Familienarchiv", + person_workbook=_person_wb(tmp_path), person_sheet="Tabelle1", + out_dir=out_dir, review_dir=review_dir, + date_overrides={}, name_overrides={}) + assert (out_dir / "canonical-documents.xlsx").exists() + assert (out_dir / "canonical-persons.xlsx").exists() + assert stats["documents_emitted"] == 3 # W-0001, C-0001, W-0001 (dup) — x and blank excluded + assert stats["skipped_x_suffix"] == 1 + assert stats["blank_index_rows"] == 1 + assert stats["duplicate_index_rows"] == 2 + assert (review_dir / "skipped-x-suffix.csv").exists() + assert (review_dir / "unparsed-dates.csv").exists() + # C-0001's "Freitag 1919" is unparseable -> must appear in the review file (NFR-DATA-01) + assert "Freitag 1919" in (review_dir / "unparsed-dates.csv").read_text(encoding="utf-8") + + # determinism (NFR-IDEM-01): a second run yields identical canonical content + review files + def _matrix(p): + wb = openpyxl.load_workbook(p) + return [[c.value for c in row] for row in wb.active.iter_rows()] + docs1 = _matrix(out_dir / "canonical-documents.xlsx") + persons1 = _matrix(out_dir / "canonical-persons.xlsx") + unparsed1 = (review_dir / "unparsed-dates.csv").read_text(encoding="utf-8") + normalize.run(document_workbook=_doc_wb(tmp_path), document_sheet="Familienarchiv", + person_workbook=_person_wb(tmp_path), person_sheet="Tabelle1", + out_dir=out_dir, review_dir=review_dir, date_overrides={}, name_overrides={}) + assert _matrix(out_dir / "canonical-documents.xlsx") == docs1 + assert _matrix(out_dir / "canonical-persons.xlsx") == persons1 + assert (review_dir / "unparsed-dates.csv").read_text(encoding="utf-8") == unparsed1 + assert len(docs1) == 4 # header + 3 docs