"""Orchestrator: read raw workbooks -> canonical outputs + review reports.""" import argparse from collections import Counter from pathlib import Path import config import ingest import persons import documents import overrides as overrides_mod import writers def run(*, document_workbook, document_sheet, person_workbook, person_sheet, out_dir, review_dir, date_overrides, name_overrides) -> dict: out_dir, review_dir = Path(out_dir), Path(review_dir) # --- persons --- person_rows = ingest.read_sheet(person_workbook, person_sheet) p_fields, _ = ingest.build_header_map(person_rows[0], config.PERSON_HEADER_MAP, config.PERSON_REQUIRED_FIELDS) person_dicts = [{f: (row[i] if i < len(row) else "") for f, i in p_fields.items()} for row in person_rows[1:]] register = persons.parse_register(person_dicts) alias_index = persons.AliasIndex(register) ctx = persons.ResolutionContext(alias_index, name_overrides) # --- documents --- doc_rows = ingest.read_sheet(document_workbook, document_sheet) d_fields, unknown_headers = ingest.build_header_map(doc_rows[0], config.DOCUMENT_HEADER_MAP, config.DOCUMENT_REQUIRED_FIELDS) index_col = d_fields["index"] canon_docs, blank_index, skipped_x, mismatches = [], [], [], [] unparsed_by_raw: dict[str, list] = {} dates_by_override = 0 empty_count = 0 seen_index = Counter() for source_row, cells in enumerate(doc_rows[1:], start=2): t = documents.triage(cells, index_col) if t is documents.Triage.EMPTY: empty_count += 1 continue if t is documents.Triage.BLANK_INDEX: blank_index.append([source_row, documents.classify_blank_index(cells, d_fields), " | ".join(c for c in cells if c)]) continue if t is documents.Triage.X_SUFFIX: idx = (cells[index_col] or "").strip() skipped_x.append([source_row, idx, idx[:-1]]) continue raw = documents.extract_row(cells, d_fields, source_row) seen_index[raw.index] += 1 if raw.date.strip() and raw.date.strip() in date_overrides: dates_by_override += 1 doc = documents.to_canonical(raw, ctx, date_overrides) if "unparsed_date" in doc.needs_review: unparsed_by_raw.setdefault(raw.date, []).append(source_row) if "index_file_mismatch" in doc.needs_review: mismatches.append([source_row, raw.index, raw.file]) canon_docs.append(doc) # REQ-TRIAGE-01: flag EVERY occurrence of a duplicated index and report all of them. dup_indexes = {idx for idx, n in seen_index.items() if n > 1} duplicates = [] for doc in canon_docs: if doc.index in dup_indexes: if "duplicate_index" not in doc.needs_review: doc.needs_review.append("duplicate_index") duplicates.append([doc.source_row, doc.index]) all_people = register + list(ctx.provisional.values()) # --- write canonical outputs --- writers.write_documents_xlsx(canon_docs, out_dir / "canonical-documents.xlsx") writers.write_persons_xlsx(all_people, out_dir / "canonical-persons.xlsx") # --- review files --- # unparsed dates: most-frequent first, with example source rows + blank override cells so a # corrected row can be pasted straight into overrides/dates.csv (same raw,iso,precision shape). unparsed_rows = sorted( ([raw, len(rows), " ".join(map(str, rows[:5])), "", ""] for raw, rows in unparsed_by_raw.items()), key=lambda r: (-r[1], r[0])) writers.write_review_csv(review_dir / "unparsed-dates.csv", ["raw", "count", "example_rows", "suggested_iso", "suggested_precision"], unparsed_rows) unmatched_rows = [] for name, rows in sorted(ctx.unmatched.items()): sid, score = alias_index.suggest(name) unmatched_rows.append([name, len(rows), " ".join(map(str, rows[:5])), sid or "", f"{score:.2f}" if sid else ""]) writers.write_review_csv(review_dir / "unmatched-names.csv", ["raw", "count", "example_rows", "suggested_id", "suggested_score"], unmatched_rows) writers.write_review_csv(review_dir / "duplicate-index.csv", ["source_row", "index"], duplicates) writers.write_review_csv(review_dir / "blank-index-rows.csv", ["source_row", "kind", "content"], blank_index) writers.write_review_csv(review_dir / "skipped-x-suffix.csv", ["source_row", "index", "base_index"], skipped_x) writers.write_review_csv(review_dir / "ambiguous-receivers.csv", ["raw", "part", "source_row"], ctx.ambiguous) writers.write_review_csv(review_dir / "index-file-mismatch.csv", ["source_row", "index", "file"], mismatches) dated = sum(1 for d in canon_docs if d.date_raw.strip()) unknown = sum(1 for d in canon_docs if d.date_raw.strip() and d.date_precision == "UNKNOWN") unknown_rate = f"{(100 * unknown / dated):.1f}%" if dated else "0.0%" stats = { "# INPUTS": "", "document_rows_read": len(doc_rows) - 1, "register_persons": len(register), "unknown_headers": ", ".join(unknown_headers) or "(none)", "# OUTPUTS": "", "documents_emitted": len(canon_docs), "provisional_persons": len(ctx.provisional), "# DATES": "", "dated_rows": dated, "unparsed_dates": unknown, "unknown_date_rate": f"{unknown_rate} (target <=5%)", "distinct_unparsed_formats": len(unparsed_by_raw), "# NAMES": "", "unmatched_name_strings": len(ctx.unmatched), "ambiguous_receivers": len(ctx.ambiguous), "# ANOMALIES": "", "empty_rows": empty_count, "blank_index_rows": len(blank_index), "skipped_x_suffix": len(skipped_x), "duplicate_index_rows": len(duplicates), "index_file_mismatches": len(mismatches), "# OVERRIDES": "", "date_overrides_loaded": len(date_overrides), "name_overrides_loaded": len(name_overrides), "dates_resolved_by_override": dates_by_override, "names_resolved_by_override": ctx.override_hits, } writers.write_summary(review_dir / "summary.txt", stats) return stats def main(): parser = argparse.ArgumentParser(description="Normalize the family archive spreadsheets.") parser.parse_args() date_overrides, name_overrides = overrides_mod.load_overrides( config.OVERRIDES_DIR / "dates.csv", config.OVERRIDES_DIR / "names.csv") stats = run( document_workbook=config.DOCUMENT_WORKBOOK, document_sheet=config.DOCUMENT_SHEET, person_workbook=config.PERSON_WORKBOOK, person_sheet=config.PERSON_SHEET, out_dir=config.OUT_DIR, review_dir=config.REVIEW_DIR, date_overrides=date_overrides, name_overrides=name_overrides) print("Normalization complete:") for k, v in stats.items(): print(f" {k}: {v}") if __name__ == "__main__": main()