Files
familienarchiv/tools/import-normalizer/normalize.py
Marcel 94a40237f4 feat(normalizer): generate structured tags from Schlagwort + Inhalt fields
Adds tags.py module implementing a three-outcome heuristic:
- Individual-to-individual correspondence tags ("Clara an Herbert") → dropped
- Group/collective correspondence ("Clara an Kinder", "Walter an Geschwister") → Briefwechsel/<value>
- Semantic/event tags ("Brautbriefe", "Alltag", "zur Hochzeit") → Themen/<value>

Three correspondence patterns detected: space-an-space, starts-with-"an ",
and abbreviated-sender form ("Maria W.an Clara").

COLLECTIVE_TERMS in config.py extended with 17 plural/group relational terms
(söhne, brüder, schwiegereltern, cousinen, etc.) confirmed against the full Excel.

Also adds two-phase summary mining: every run emits review/tag-candidates.csv;
subsequent runs apply keywords from overrides/approved-themes.csv as Themen tags.

Outputs: canonical-documents.xlsx gets pipe-separated "Parent/Child" tag paths;
canonical-tag-tree.xlsx provides the full tag hierarchy for backend pre-import.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 19:47:36 +02:00

172 lines
8.2 KiB
Python

"""Orchestrator: read raw workbooks -> canonical outputs + review reports."""
import argparse
from collections import Counter
from pathlib import Path
import config
import ingest
import persons
import documents
import overrides as overrides_mod
import tags as _tags
import writers
def run(*, document_workbook, document_sheet, person_workbook, person_sheet,
out_dir, review_dir, date_overrides, name_overrides,
approved_themes_path=None) -> dict:
out_dir, review_dir = Path(out_dir), Path(review_dir)
approved_themes = _tags.load_approved_themes(Path(approved_themes_path)) if approved_themes_path else set()
# --- persons ---
person_rows = ingest.read_sheet(person_workbook, person_sheet)
p_fields, _ = ingest.build_header_map(person_rows[0], config.PERSON_HEADER_MAP, config.PERSON_REQUIRED_FIELDS)
person_dicts = [{f: (row[i] if i < len(row) else "") for f, i in p_fields.items()} for row in person_rows[1:]]
register = persons.parse_register(person_dicts)
alias_index = persons.AliasIndex(register)
given_names = persons.build_given_names(register, config.EXTRA_GIVEN_NAMES)
ctx = persons.ResolutionContext(alias_index, name_overrides, given_names=given_names)
# --- documents ---
doc_rows = ingest.read_sheet(document_workbook, document_sheet)
d_fields, unknown_headers = ingest.build_header_map(doc_rows[0], config.DOCUMENT_HEADER_MAP, config.DOCUMENT_REQUIRED_FIELDS)
index_col = d_fields["index"]
canon_docs, blank_index, skipped_x, mismatches = [], [], [], []
unparsed_by_raw: dict[str, list] = {}
dates_by_override = 0
empty_count = 0
seen_index = Counter()
for source_row, cells in enumerate(doc_rows[1:], start=2):
t = documents.triage(cells, index_col)
if t is documents.Triage.EMPTY:
empty_count += 1
continue
if t is documents.Triage.BLANK_INDEX:
blank_index.append([source_row, documents.classify_blank_index(cells, d_fields),
" | ".join(c for c in cells if c)])
continue
if t is documents.Triage.X_SUFFIX:
idx = (cells[index_col] or "").strip()
skipped_x.append([source_row, idx, idx[:-1]])
continue
raw = documents.extract_row(cells, d_fields, source_row)
seen_index[raw.index] += 1
if raw.date.strip() and raw.date.strip() in date_overrides:
dates_by_override += 1
doc = documents.to_canonical(raw, ctx, date_overrides, frozenset(approved_themes))
if "unparsed_date" in doc.needs_review:
unparsed_by_raw.setdefault(raw.date, []).append(source_row)
if "index_file_mismatch" in doc.needs_review:
mismatches.append([source_row, raw.index, raw.file])
canon_docs.append(doc)
# REQ-TRIAGE-01: flag EVERY occurrence of a duplicated index and report all of them.
dup_indexes = {idx for idx, n in seen_index.items() if n > 1}
duplicates = []
for doc in canon_docs:
if doc.index in dup_indexes:
if "duplicate_index" not in doc.needs_review:
doc.needs_review.append("duplicate_index")
duplicates.append([doc.source_row, doc.index])
all_people = register + list(ctx.provisional.values())
# --- write canonical outputs ---
writers.write_documents_xlsx(canon_docs, out_dir / "canonical-documents.xlsx")
writers.write_persons_xlsx(all_people, out_dir / "canonical-persons.xlsx")
all_tag_paths = [path for doc in canon_docs for path in doc.tags]
writers.write_tag_tree_xlsx(_tags.build_tag_tree(all_tag_paths), out_dir / "canonical-tag-tree.xlsx")
# --- review files ---
# unparsed dates: most-frequent first, with example source rows + blank override cells so a
# corrected row can be pasted straight into overrides/dates.csv (same raw,iso,precision shape).
unparsed_rows = sorted(
([raw, len(rows), " ".join(map(str, rows[:5])), "", ""] for raw, rows in unparsed_by_raw.items()),
key=lambda r: (-r[1], r[0]))
writers.write_review_csv(review_dir / "unparsed-dates.csv",
["raw", "count", "example_rows", "suggested_iso", "suggested_precision"], unparsed_rows)
writers.write_review_csv(review_dir / "duplicate-index.csv", ["source_row", "index"], duplicates)
writers.write_review_csv(review_dir / "blank-index-rows.csv", ["source_row", "kind", "content"], blank_index)
writers.write_review_csv(review_dir / "skipped-x-suffix.csv", ["source_row", "index", "base_index"], skipped_x)
unresolved_agg: dict[tuple, list] = {}
for name, category, row in ctx.unresolved:
unresolved_agg.setdefault((category, name), []).append(row)
unresolved_rows = sorted(
([cat, name, len(rows), " ".join(map(str, sorted(rows)[:5]))]
for (cat, name), rows in unresolved_agg.items()),
key=lambda r: (r[0], -r[2], r[1]))
writers.write_review_csv(review_dir / "unresolved-names.csv",
["category", "raw", "count", "example_rows"], unresolved_rows)
writers.write_review_csv(review_dir / "index-file-mismatch.csv", ["source_row", "index", "file"], mismatches)
all_summaries = [doc.summary for doc in canon_docs if doc.summary]
candidates = _tags.mine_summary_candidates(all_summaries)
writers.write_review_csv(review_dir / "tag-candidates.csv", ["candidate", "count"],
[[c, n] for c, n in candidates])
dated = sum(1 for d in canon_docs if d.date_raw.strip())
unknown = sum(1 for d in canon_docs if d.date_raw.strip() and d.date_precision == "UNKNOWN")
unknown_rate = f"{(100 * unknown / dated):.1f}%" if dated else "0.0%"
stats = {
"# INPUTS": "",
"document_rows_read": len(doc_rows) - 1,
"register_persons": len(register),
"unknown_headers": ", ".join(unknown_headers) or "(none)",
"# OUTPUTS": "",
"documents_emitted": len(canon_docs),
"provisional_persons": len(ctx.provisional),
"# DATES": "",
"dated_rows": dated,
"unparsed_dates": unknown,
"unknown_date_rate": f"{unknown_rate} (target <=5%)",
"distinct_unparsed_formats": len(unparsed_by_raw),
"# NAMES": "",
"unmatched_name_strings": len(ctx.unmatched),
"unresolved_name_occurrences": len(ctx.unresolved),
"unresolved_unknown": sum(1 for _, c, _ in ctx.unresolved if c == "unknown"),
"unresolved_single_token": sum(1 for _, c, _ in ctx.unresolved if c == "single_token"),
"unresolved_relational": sum(1 for _, c, _ in ctx.unresolved if c == "relational"),
"unresolved_collective": sum(1 for _, c, _ in ctx.unresolved if c == "collective"),
"unresolved_prose": sum(1 for _, c, _ in ctx.unresolved if c == "prose"),
"unresolved_ambiguous_pair": sum(1 for _, c, _ in ctx.unresolved if c == "ambiguous_pair"),
"# ANOMALIES": "",
"empty_rows": empty_count,
"blank_index_rows": len(blank_index),
"skipped_x_suffix": len(skipped_x),
"duplicate_index_rows": len(duplicates),
"index_file_mismatches": len(mismatches),
"# OVERRIDES": "",
"date_overrides_loaded": len(date_overrides),
"name_overrides_loaded": len(name_overrides),
"dates_resolved_by_override": dates_by_override,
"names_resolved_by_override": ctx.override_hits,
}
writers.write_summary(review_dir / "summary.txt", stats)
return stats
def main():
parser = argparse.ArgumentParser(description="Normalize the family archive spreadsheets.")
parser.parse_args()
date_overrides, name_overrides = overrides_mod.load_overrides(
config.OVERRIDES_DIR / "dates.csv", config.OVERRIDES_DIR / "names.csv")
stats = run(
document_workbook=config.DOCUMENT_WORKBOOK, document_sheet=config.DOCUMENT_SHEET,
person_workbook=config.PERSON_WORKBOOK, person_sheet=config.PERSON_SHEET,
out_dir=config.OUT_DIR, review_dir=config.REVIEW_DIR,
date_overrides=date_overrides, name_overrides=name_overrides,
approved_themes_path=config.OVERRIDES_DIR / "approved-themes.csv")
print("Normalization complete:")
for k, v in stats.items():
print(f" {k}: {v}")
if __name__ == "__main__":
main()