feat(normalizer): add main() CLI to persons_tree

Wires the two-pass pipeline (parse → deduplicate → index → resolve)
into a runnable CLI with --input, --output, and --dry-run flags.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Marcel
2026-05-25 21:16:21 +02:00
parent 34c40cb0ee
commit e326630318
2 changed files with 121 additions and 0 deletions

View File

@@ -310,3 +310,100 @@ def _parse_bemerkung(
# No pattern matched — full text goes to notes, nothing to unresolved
return [], [], s
def main() -> None:
parser = argparse.ArgumentParser(
description="Normalize Personendatei 2.xlsx → canonical-persons-tree.json"
)
parser.add_argument(
"--input", default=str(config.PERSON_WORKBOOK),
help="Path to Personendatei 2.xlsx"
)
parser.add_argument(
"--output", default=str(config.OUT_DIR / "canonical-persons-tree.json"),
help="Path for output JSON"
)
parser.add_argument("--dry-run", action="store_true", help="Print stats, skip write")
args = parser.parse_args()
from ingest import read_sheet, build_header_map
rows = read_sheet(Path(args.input), config.PERSON_SHEET)
if not rows:
print("ERROR: sheet is empty", file=sys.stderr)
sys.exit(1)
header_row = [str(v) for v in rows[0]]
fields_map, _ = build_header_map(header_row, config.PERSON_HEADER_MAP, config.PERSON_REQUIRED_FIELDS)
# --- Pass 1: parse rows ---
persons_raw: list[dict] = []
for row_num, row in enumerate(rows[1:], start=2):
field_dict = {field: (row[col] if col < len(row) else "") for field, col in fields_map.items()}
if not field_dict.get("last_name", "").strip():
continue
persons_raw.append(_parse_row(row_num, field_dict))
persons, skipped_msgs = _deduplicate(persons_raw)
for msg in skipped_msgs:
print(f" SKIP {msg}", file=sys.stderr)
index = _build_index(persons)
# --- Pass 2: resolve relationships ---
all_rels: list[dict] = []
all_unresolved: list[dict] = []
spouse_rels, spouse_unres = _resolve_spouses(persons, index)
all_rels.extend(spouse_rels)
all_unresolved.extend(spouse_unres)
for p in persons:
bemerkung = p.pop("_bemerkung_raw", None) or ""
p.pop("_spouse_raw", None)
rels, unres, remaining = _parse_bemerkung(p["rowId"], bemerkung, index)
all_rels.extend(rels)
all_unresolved.extend(unres)
if remaining:
existing = p.get("notes") or ""
if remaining not in existing:
p["notes"] = (existing + " " + remaining).strip() if existing else remaining
# --- Stats output ---
spouse_count = sum(1 for r in all_rels if r["type"] == "SPOUSE_OF")
parent_count = sum(1 for r in all_rels if r["type"] == "PARENT_OF")
print(f"{len(persons)} persons parsed")
print(f"{len(all_rels)} relationships emitted ({spouse_count} SPOUSE_OF, {parent_count} PARENT_OF)")
if all_unresolved:
print(f"{len(all_unresolved)} unresolved (see unresolved[] in output)")
if args.dry_run:
print("\n--- dry-run: first 5 unresolved ---")
for u in all_unresolved[:5]:
print(f" {u}")
return
output = {
"generated_at": datetime.datetime.now().isoformat(),
"source": Path(args.input).name,
"stats": {
"persons": len(persons),
"relationships": len(all_rels),
"unresolved": len(all_unresolved),
},
"persons": persons,
"relationships": all_rels,
"unresolved": all_unresolved,
}
out_path = Path(args.output)
out_path.parent.mkdir(exist_ok=True)
out_path.write_text(json.dumps(output, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"{args.output}")
if __name__ == "__main__":
main()