feat(normalizer): orchestrator + end-to-end integration test
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
151
tools/import-normalizer/normalize.py
Normal file
151
tools/import-normalizer/normalize.py
Normal file
@@ -0,0 +1,151 @@
|
|||||||
|
"""Orchestrator: read raw workbooks -> canonical outputs + review reports."""
|
||||||
|
import argparse
|
||||||
|
from collections import Counter
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import config
|
||||||
|
import ingest
|
||||||
|
import persons
|
||||||
|
import documents
|
||||||
|
import overrides as overrides_mod
|
||||||
|
import writers
|
||||||
|
|
||||||
|
|
||||||
|
def run(*, document_workbook, document_sheet, person_workbook, person_sheet,
|
||||||
|
out_dir, review_dir, date_overrides, name_overrides) -> dict:
|
||||||
|
out_dir, review_dir = Path(out_dir), Path(review_dir)
|
||||||
|
|
||||||
|
# --- persons ---
|
||||||
|
person_rows = ingest.read_sheet(person_workbook, person_sheet)
|
||||||
|
p_fields, _ = ingest.build_header_map(person_rows[0], config.PERSON_HEADER_MAP, config.PERSON_REQUIRED_FIELDS)
|
||||||
|
person_dicts = [{f: (row[i] if i < len(row) else "") for f, i in p_fields.items()} for row in person_rows[1:]]
|
||||||
|
register = persons.parse_register(person_dicts)
|
||||||
|
alias_index = persons.AliasIndex(register)
|
||||||
|
ctx = persons.ResolutionContext(alias_index, name_overrides)
|
||||||
|
|
||||||
|
# --- documents ---
|
||||||
|
doc_rows = ingest.read_sheet(document_workbook, document_sheet)
|
||||||
|
d_fields, unknown_headers = ingest.build_header_map(doc_rows[0], config.DOCUMENT_HEADER_MAP, config.DOCUMENT_REQUIRED_FIELDS)
|
||||||
|
index_col = d_fields["index"]
|
||||||
|
|
||||||
|
canon_docs, blank_index, skipped_x, mismatches = [], [], [], []
|
||||||
|
unparsed_by_raw: dict[str, list] = {}
|
||||||
|
dates_by_override = 0
|
||||||
|
empty_count = 0
|
||||||
|
seen_index = Counter()
|
||||||
|
|
||||||
|
for source_row, cells in enumerate(doc_rows[1:], start=2):
|
||||||
|
t = documents.triage(cells, index_col)
|
||||||
|
if t is documents.Triage.EMPTY:
|
||||||
|
empty_count += 1
|
||||||
|
continue
|
||||||
|
if t is documents.Triage.BLANK_INDEX:
|
||||||
|
blank_index.append([source_row, documents.classify_blank_index(cells, d_fields),
|
||||||
|
" | ".join(c for c in cells if c)])
|
||||||
|
continue
|
||||||
|
if t is documents.Triage.X_SUFFIX:
|
||||||
|
idx = (cells[index_col] or "").strip()
|
||||||
|
skipped_x.append([source_row, idx, idx[:-1]])
|
||||||
|
continue
|
||||||
|
raw = documents.extract_row(cells, d_fields, source_row)
|
||||||
|
seen_index[raw.index] += 1
|
||||||
|
if raw.date.strip() and raw.date.strip() in date_overrides:
|
||||||
|
dates_by_override += 1
|
||||||
|
doc = documents.to_canonical(raw, ctx, date_overrides)
|
||||||
|
if "unparsed_date" in doc.needs_review:
|
||||||
|
unparsed_by_raw.setdefault(raw.date, []).append(source_row)
|
||||||
|
if "index_file_mismatch" in doc.needs_review:
|
||||||
|
mismatches.append([source_row, raw.index, raw.file])
|
||||||
|
canon_docs.append(doc)
|
||||||
|
|
||||||
|
# REQ-TRIAGE-01: flag EVERY occurrence of a duplicated index and report all of them.
|
||||||
|
dup_indexes = {idx for idx, n in seen_index.items() if n > 1}
|
||||||
|
duplicates = []
|
||||||
|
for doc in canon_docs:
|
||||||
|
if doc.index in dup_indexes:
|
||||||
|
if "duplicate_index" not in doc.needs_review:
|
||||||
|
doc.needs_review.append("duplicate_index")
|
||||||
|
duplicates.append([doc.source_row, doc.index])
|
||||||
|
|
||||||
|
all_people = register + list(ctx.provisional.values())
|
||||||
|
|
||||||
|
# --- write canonical outputs ---
|
||||||
|
writers.write_documents_xlsx(canon_docs, out_dir / "canonical-documents.xlsx")
|
||||||
|
writers.write_persons_xlsx(all_people, out_dir / "canonical-persons.xlsx")
|
||||||
|
|
||||||
|
# --- review files ---
|
||||||
|
# unparsed dates: most-frequent first, with example source rows + blank override cells so a
|
||||||
|
# corrected row can be pasted straight into overrides/dates.csv (same raw,iso,precision shape).
|
||||||
|
unparsed_rows = sorted(
|
||||||
|
([raw, len(rows), " ".join(map(str, rows[:5])), "", ""] for raw, rows in unparsed_by_raw.items()),
|
||||||
|
key=lambda r: (-r[1], r[0]))
|
||||||
|
writers.write_review_csv(review_dir / "unparsed-dates.csv",
|
||||||
|
["raw", "count", "example_rows", "suggested_iso", "suggested_precision"], unparsed_rows)
|
||||||
|
|
||||||
|
unmatched_rows = []
|
||||||
|
for name, rows in sorted(ctx.unmatched.items()):
|
||||||
|
sid, score = alias_index.suggest(name)
|
||||||
|
unmatched_rows.append([name, len(rows), " ".join(map(str, rows[:5])),
|
||||||
|
sid or "", f"{score:.2f}" if sid else ""])
|
||||||
|
writers.write_review_csv(review_dir / "unmatched-names.csv",
|
||||||
|
["raw", "count", "example_rows", "suggested_id", "suggested_score"], unmatched_rows)
|
||||||
|
|
||||||
|
writers.write_review_csv(review_dir / "duplicate-index.csv", ["source_row", "index"], duplicates)
|
||||||
|
writers.write_review_csv(review_dir / "blank-index-rows.csv", ["source_row", "kind", "content"], blank_index)
|
||||||
|
writers.write_review_csv(review_dir / "skipped-x-suffix.csv", ["source_row", "index", "base_index"], skipped_x)
|
||||||
|
writers.write_review_csv(review_dir / "ambiguous-receivers.csv", ["raw", "part", "source_row"], ctx.ambiguous)
|
||||||
|
writers.write_review_csv(review_dir / "index-file-mismatch.csv", ["source_row", "index", "file"], mismatches)
|
||||||
|
|
||||||
|
dated = sum(1 for d in canon_docs if d.date_raw.strip())
|
||||||
|
unknown = sum(1 for d in canon_docs if d.date_raw.strip() and d.date_precision == "UNKNOWN")
|
||||||
|
unknown_rate = f"{(100 * unknown / dated):.1f}%" if dated else "0.0%"
|
||||||
|
|
||||||
|
stats = {
|
||||||
|
"# INPUTS": "",
|
||||||
|
"document_rows_read": len(doc_rows) - 1,
|
||||||
|
"register_persons": len(register),
|
||||||
|
"unknown_headers": ", ".join(unknown_headers) or "(none)",
|
||||||
|
"# OUTPUTS": "",
|
||||||
|
"documents_emitted": len(canon_docs),
|
||||||
|
"provisional_persons": len(ctx.provisional),
|
||||||
|
"# DATES": "",
|
||||||
|
"dated_rows": dated,
|
||||||
|
"unparsed_dates": unknown,
|
||||||
|
"unknown_date_rate": f"{unknown_rate} (target <=5%)",
|
||||||
|
"distinct_unparsed_formats": len(unparsed_by_raw),
|
||||||
|
"# NAMES": "",
|
||||||
|
"unmatched_name_strings": len(ctx.unmatched),
|
||||||
|
"ambiguous_receivers": len(ctx.ambiguous),
|
||||||
|
"# ANOMALIES": "",
|
||||||
|
"empty_rows": empty_count,
|
||||||
|
"blank_index_rows": len(blank_index),
|
||||||
|
"skipped_x_suffix": len(skipped_x),
|
||||||
|
"duplicate_index_rows": len(duplicates),
|
||||||
|
"index_file_mismatches": len(mismatches),
|
||||||
|
"# OVERRIDES": "",
|
||||||
|
"date_overrides_loaded": len(date_overrides),
|
||||||
|
"name_overrides_loaded": len(name_overrides),
|
||||||
|
"dates_resolved_by_override": dates_by_override,
|
||||||
|
"names_resolved_by_override": ctx.override_hits,
|
||||||
|
}
|
||||||
|
writers.write_summary(review_dir / "summary.txt", stats)
|
||||||
|
return stats
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(description="Normalize the family archive spreadsheets.")
|
||||||
|
parser.parse_args()
|
||||||
|
date_overrides, name_overrides = overrides_mod.load_overrides(
|
||||||
|
config.OVERRIDES_DIR / "dates.csv", config.OVERRIDES_DIR / "names.csv")
|
||||||
|
stats = run(
|
||||||
|
document_workbook=config.DOCUMENT_WORKBOOK, document_sheet=config.DOCUMENT_SHEET,
|
||||||
|
person_workbook=config.PERSON_WORKBOOK, person_sheet=config.PERSON_SHEET,
|
||||||
|
out_dir=config.OUT_DIR, review_dir=config.REVIEW_DIR,
|
||||||
|
date_overrides=date_overrides, name_overrides=name_overrides)
|
||||||
|
print("Normalization complete:")
|
||||||
|
for k, v in stats.items():
|
||||||
|
print(f" {k}: {v}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
59
tools/import-normalizer/tests/test_normalize.py
Normal file
59
tools/import-normalizer/tests/test_normalize.py
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
import openpyxl
|
||||||
|
import normalize
|
||||||
|
|
||||||
|
|
||||||
|
def _doc_wb(tmp_path):
|
||||||
|
wb = openpyxl.Workbook(); ws = wb.active; ws.title = "Familienarchiv"
|
||||||
|
ws.append(["Index", "Datei", "Box", "Mappe", "BriefeschreiberIn", "EmpfängerIn",
|
||||||
|
"Datum des Briefes", "Ort", "Schlagwort", "Inhalt"])
|
||||||
|
ws.append(["W-0001", r"..\__scan\W-0001.pdf", "V", "1", "Walter de Gruyter",
|
||||||
|
"Eugenie Müller", "15.2.1888", "Rotterdam", "Brautbriefe", "Geschäftsreise"])
|
||||||
|
ws.append(["W-0001x", r"..\__scan\W-0001x.pdf", "", "", "Walter de Gruyter", "Eugenie Müller", "", "", "", ""])
|
||||||
|
ws.append(["", "", "", "", "Section banner row", "", "", "", "", ""])
|
||||||
|
ws.append(["C-0001", "", "", "", "Hans Wittkopf", "", "Freitag 1919", "", "", ""])
|
||||||
|
ws.append(["W-0001", r"..\__scan\W-0001.pdf", "V", "1", "Walter de Gruyter",
|
||||||
|
"Eugenie Müller", "15.2.1888", "Rotterdam", "Brautbriefe", "dup"])
|
||||||
|
p = tmp_path / "docs.xlsx"; wb.save(p); return p
|
||||||
|
|
||||||
|
|
||||||
|
def _person_wb(tmp_path):
|
||||||
|
wb = openpyxl.Workbook(); ws = wb.active; ws.title = "Tabelle1"
|
||||||
|
ws.append(["Generation", "Familienname", "Vorname", "geb als", "Geburtsdatum",
|
||||||
|
"Geburtsort", "Todesdatum", "Sterbeort", "verheiratet mit", "Bemerkung"])
|
||||||
|
ws.append(["G 1", "de Gruyter", "Walter", "", "", "", "", "", "", ""])
|
||||||
|
ws.append(["G 1", "de Gruyter", "Eugenie", "Müller", "", "", "", "", "", ""])
|
||||||
|
p = tmp_path / "persons.xlsx"; wb.save(p); return p
|
||||||
|
|
||||||
|
|
||||||
|
def test_run_end_to_end(tmp_path):
|
||||||
|
out_dir = tmp_path / "out"; review_dir = tmp_path / "review"
|
||||||
|
stats = normalize.run(
|
||||||
|
document_workbook=_doc_wb(tmp_path), document_sheet="Familienarchiv",
|
||||||
|
person_workbook=_person_wb(tmp_path), person_sheet="Tabelle1",
|
||||||
|
out_dir=out_dir, review_dir=review_dir,
|
||||||
|
date_overrides={}, name_overrides={})
|
||||||
|
assert (out_dir / "canonical-documents.xlsx").exists()
|
||||||
|
assert (out_dir / "canonical-persons.xlsx").exists()
|
||||||
|
assert stats["documents_emitted"] == 3 # W-0001, C-0001, W-0001 (dup) — x and blank excluded
|
||||||
|
assert stats["skipped_x_suffix"] == 1
|
||||||
|
assert stats["blank_index_rows"] == 1
|
||||||
|
assert stats["duplicate_index_rows"] == 2
|
||||||
|
assert (review_dir / "skipped-x-suffix.csv").exists()
|
||||||
|
assert (review_dir / "unparsed-dates.csv").exists()
|
||||||
|
# C-0001's "Freitag 1919" is unparseable -> must appear in the review file (NFR-DATA-01)
|
||||||
|
assert "Freitag 1919" in (review_dir / "unparsed-dates.csv").read_text(encoding="utf-8")
|
||||||
|
|
||||||
|
# determinism (NFR-IDEM-01): a second run yields identical canonical content + review files
|
||||||
|
def _matrix(p):
|
||||||
|
wb = openpyxl.load_workbook(p)
|
||||||
|
return [[c.value for c in row] for row in wb.active.iter_rows()]
|
||||||
|
docs1 = _matrix(out_dir / "canonical-documents.xlsx")
|
||||||
|
persons1 = _matrix(out_dir / "canonical-persons.xlsx")
|
||||||
|
unparsed1 = (review_dir / "unparsed-dates.csv").read_text(encoding="utf-8")
|
||||||
|
normalize.run(document_workbook=_doc_wb(tmp_path), document_sheet="Familienarchiv",
|
||||||
|
person_workbook=_person_wb(tmp_path), person_sheet="Tabelle1",
|
||||||
|
out_dir=out_dir, review_dir=review_dir, date_overrides={}, name_overrides={})
|
||||||
|
assert _matrix(out_dir / "canonical-documents.xlsx") == docs1
|
||||||
|
assert _matrix(out_dir / "canonical-persons.xlsx") == persons1
|
||||||
|
assert (review_dir / "unparsed-dates.csv").read_text(encoding="utf-8") == unparsed1
|
||||||
|
assert len(docs1) == 4 # header + 3 docs
|
||||||
Reference in New Issue
Block a user