"""Normalize Personendatei 2.xlsx into canonical-persons-tree.json.""" import argparse import datetime import json import re import sys from pathlib import Path import config import dates import persons as _persons from persons import _strip_accents # Pinned so the committed tree JSON is reproducible and does not churn on every run # (NFR-IDEM-01) — mirrors writers._FIXED_TS for the xlsx exports. _GENERATED_AT = "2020-01-01T00:00:00" _MIN_YEAR = 1700 _MAX_YEAR = 2100 # Threshold: if parse_date parses a pure-digit string as a year outside [_MIN_YEAR, _MAX_YEAR], # but the year is a plausible typo (1000-3000), don't try serial conversion. # Years outside this range (e.g., 7568) are implausible and should try serial conversion. _PLAUSIBLE_TYPO_MIN = 1000 _PLAUSIBLE_TYPO_MAX = 3000 def _parse_year(raw: str | None) -> int | None: """Extract a birth/death year from an Excel cell string. Handles three cases: 1. ISO / German / text string parseable by parse_date() → extract year if in range 2. Pure-integer string (out-of-range or unparseable) → try Excel serial conversion (unless it's a plausible typo year, e.g., "1023" for "1923") 3. Mixed-format or unresolvable → None Serial conversion only fires for pure-digit strings and implausible years, preventing typo years like "1023" from being mis-converted as serials. """ if raw is None: return None s = str(raw).strip() if not s: return None # Check if it's a pure-digit string (candidate for serial conversion) is_pure_digit = re.fullmatch(r"\d+", s) is not None # Try parse_date first (handles ISO, DD.MM.YYYY, year-only, month+year, etc.) result = dates.parse_date(s) if result.iso: year = int(result.iso[:4]) if _MIN_YEAR <= year <= _MAX_YEAR: return year # Year is out of range. Only try serial conversion if it's an implausible year. # Plausible typos (e.g., 1023 for 1923) should not be converted as serials. if is_pure_digit and not (_PLAUSIBLE_TYPO_MIN <= year <= _PLAUSIBLE_TYPO_MAX): n = int(s) if 1 <= n <= 80_000: d = datetime.date(1899, 12, 30) + datetime.timedelta(days=n) if _MIN_YEAR <= d.year <= _MAX_YEAR: return d.year return None # parse_date() found nothing. Try serial conversion only for pure-digit strings. if is_pure_digit: n = int(s) if 1 <= n <= 80_000: d = datetime.date(1899, 12, 30) + datetime.timedelta(days=n) if _MIN_YEAR <= d.year <= _MAX_YEAR: return d.year return None def _parse_generation(raw: str | None) -> int | None: """Extract the generation integer from column A values like 'G 3', 'G3', 'G 0'.""" if not raw: return None m = re.search(r"\d+", str(raw)) return int(m.group()) if m else None _GEO_SUFFIXES = {"aachen", "mex", "mexiko", "sen", "jun", "jr"} def _norm_tree(s: str) -> str: """Normalize a name string for tree matching. - Strip surrounding quotes, remove parenthetical substrings - Diacritic → ASCII (ä→ae etc.), lowercase, dots → spaces - Remove known geographic/honorific suffix tokens - Collapse whitespace """ s = (s or "").strip().strip("\"'") s = re.sub(r"\([^)]*\)", "", s) s = _strip_accents(s).lower().replace(".", " ") tokens = [t for t in s.split() if t and t not in _GEO_SUFFIXES] return " ".join(tokens).strip("., ") def _build_index(persons: list[dict]) -> dict[str, list[str]]: """Build a name → [rowId, …] lookup index with four keys per person.""" index: dict[str, list[str]] = {} def _add(key: str, row_id: str) -> None: if key: index.setdefault(key, []).append(row_id) for p in persons: row_id = p["rowId"] first = p.get("firstName") or "" last = p.get("lastName") or "" maiden = p.get("maidenName") or "" _add(_norm_tree(f"{first} {last}"), row_id) _add(_norm_tree(f"{last} {first}"), row_id) if maiden: _add(_norm_tree(f"{first} {maiden}"), row_id) _add(_norm_tree(last), row_id) return index def _resolve_one(raw: str, index: dict[str, list[str]]) -> tuple[str | None, str | None]: """Return (row_id, None) on unique match, (None, reason) otherwise.""" key = _norm_tree(raw) if not key: return None, "empty" hits = index.get(key, []) if len(hits) == 1: return hits[0], None if len(hits) == 0: return None, "not_found" return None, "ambiguous" def _parse_row(row_num: int, fields: dict) -> dict: """Produce one person record from a header-mapped row dict. Internal keys prefixed with '_' are stripped before JSON output in main(). """ def s(key: str) -> str: return (fields.get(key) or "").strip() birth_raw = s("birth_date") death_raw = s("death_date") birth_year = _parse_year(birth_raw) death_year = _parse_year(death_raw) notes_parts = [] if birth_raw and birth_year is None: notes_parts.append(f"[Geburtsdatum: {birth_raw}]") if death_raw and death_year is None: notes_parts.append(f"[Todesdatum: {death_raw}]") bemerkung = s("notes") if bemerkung: notes_parts.append(bemerkung) maiden = s("maiden_name") or None spouse = s("spouse") or None bemerkung_out = bemerkung or None return { "rowId": f"row_{row_num:03d}", "firstName": s("first_name"), "lastName": s("last_name"), "maidenName": maiden, "alias": None, "notes": " ".join(notes_parts) or None, "birthYear": birth_year, "deathYear": death_year, "birthPlace": s("birth_place") or None, "deathPlace": s("death_place") or None, "generation": _parse_generation(s("generation")), "familyMember": True, "_spouse_raw": spouse, "_bemerkung_raw": bemerkung_out, } def _attach_person_ids(tree_persons: list[dict], raw_dicts: list[dict]) -> None: """Attach the register's verbatim person_id to each tree person, in place. The register (persons.parse_register) is the sole authority for person_id; it slugifies and suffixes colliding ids exactly once. We propagate that id rather than re-slugify in the tree, because re-slugifying would not reproduce the register's collision suffixes and so would not reconcile 1:1 with the register (#670, Gap 3). tree_persons and raw_dicts must be the same length and in the same row order — parse_register and _parse_row both keep exactly the rows that have a last name. """ register = _persons.parse_register(raw_dicts) if len(tree_persons) != len(register): raise ValueError( "person_id propagation requires equal length: " f"{len(tree_persons)} tree persons vs {len(register)} register persons " "(the positional zip would otherwise silently truncate and mis-join ids)" ) for tree_person, register_person in zip(tree_persons, register): tree_person["personId"] = register_person.person_id def _deduplicate(persons: list[dict]) -> tuple[list[dict], list[str]]: """Remove duplicate rows. Two-stage: 1. Exact (firstName, lastName, birthYear) match. 2. (firstName, lastName) where the later entry has birthYear=None and an earlier entry already has a known birthYear. """ seen_full: dict[tuple, str] = {} # (first, last, year) -> rowId seen_name: dict[tuple, str] = {} # (first, last) -> rowId of first entry with a year result: list[dict] = [] skipped: list[str] = [] for p in persons: first, last, year = p["firstName"], p["lastName"], p["birthYear"] key_full = (first, last, year) key_name = (first, last) if key_full in seen_full: skipped.append(f"{p['rowId']} duplicates {seen_full[key_full]} ({first} {last}, year={year})") continue if year is None and key_name in seen_name: skipped.append(f"{p['rowId']} duplicates {seen_name[key_name]} ({first} {last}, no birth year)") continue seen_full[key_full] = p["rowId"] if year is not None: seen_name[key_name] = p["rowId"] result.append(p) return result, skipped def _resolve_spouses( persons: list[dict], index: dict[str, list[str]] ) -> tuple[list[dict], list[dict]]: """Emit SPOUSE_OF edges from each person's _spouse_raw field.""" relationships: list[dict] = [] unresolved: list[dict] = [] emitted: set[frozenset] = set() for p in persons: raw = (p.get("_spouse_raw") or "").strip() if not raw: continue row_id = p["rowId"] matched_id, reason = _resolve_one(raw, index) if matched_id: edge = frozenset([row_id, matched_id]) if edge not in emitted: emitted.add(edge) relationships.append({ "personId": row_id, "relatedPersonId": matched_id, "type": "SPOUSE_OF", "source": "verheiratet_mit", }) else: unresolved.append({ "rowId": row_id, "field": "verheiratet_mit", "raw": raw, "reason": reason, }) return relationships, unresolved _CHILD_RE = re.compile(r"^(?:Sohn|Tochter)\s+v(?:on)?\s+(.+)", re.I) _PARENT_RE = re.compile(r"^(?:Vater|Mutter)\s+v(?:on)?\s+(.+)", re.I) _AND_RE = re.compile(r"\s+u(?:nd)?\s+", re.I) def _parse_bemerkung( row_id: str, bemerkung: str, index: dict[str, list[str]] ) -> tuple[list[dict], list[dict], str]: """Extract PARENT_OF edges from a Bemerkung cell. Returns (relationships, unresolved, remaining_notes). Text that doesn't match a parent pattern goes to remaining_notes unchanged. """ if not bemerkung or not bemerkung.strip(): return [], [], "" s = bemerkung.strip() for pattern, direction in ((_CHILD_RE, "child"), (_PARENT_RE, "parent")): m = pattern.match(s) if not m: continue # Split the captured group on the first comma or semicolon to separate # the name part from any trailing description (e.g. ", nach Mexiko emigriert") raw_names, _, trailing = m.group(1).strip().partition(",") if not trailing: raw_names, _, trailing = raw_names.partition(";") name_part = raw_names.strip().rstrip("!., ") remainder = trailing.strip().lstrip(".,! ") parts = [p.strip() for p in _AND_RE.split(name_part) if p.strip()] rels: list[dict] = [] unres: list[dict] = [] for part in parts: part = part.rstrip("!., ") matched_id, reason = _resolve_one(part, index) if matched_id: if direction == "child": rels.append({ "personId": matched_id, "relatedPersonId": row_id, "type": "PARENT_OF", "source": "bemerkung", "rawBemerkung": bemerkung, }) else: rels.append({ "personId": row_id, "relatedPersonId": matched_id, "type": "PARENT_OF", "source": "bemerkung", "rawBemerkung": bemerkung, }) else: unres.append({ "rowId": row_id, "field": "bemerkung", "raw": bemerkung, "reason": reason, }) return rels, unres, remainder # No pattern matched — full text goes to notes, nothing to unresolved return [], [], s def main() -> None: parser = argparse.ArgumentParser( description="Normalize Personendatei 2.xlsx → canonical-persons-tree.json" ) parser.add_argument( "--input", default=str(config.PERSON_WORKBOOK), help="Path to Personendatei 2.xlsx" ) parser.add_argument( "--output", default=str(config.OUT_DIR / "canonical-persons-tree.json"), help="Path for output JSON" ) parser.add_argument("--dry-run", action="store_true", help="Print stats, skip write") args = parser.parse_args() from ingest import read_sheet, build_header_map rows = read_sheet(Path(args.input), config.PERSON_SHEET) if not rows: print("ERROR: sheet is empty", file=sys.stderr) sys.exit(1) header_row = [str(v) for v in rows[0]] fields_map, _ = build_header_map(header_row, config.PERSON_HEADER_MAP, config.PERSON_REQUIRED_FIELDS) # --- Pass 1: parse rows --- persons_raw: list[dict] = [] raw_dicts: list[dict] = [] for row_num, row in enumerate(rows[1:], start=2): field_dict = {field: (row[col] if col < len(row) else "") for field, col in fields_map.items()} if not field_dict.get("last_name", "").strip(): continue persons_raw.append(_parse_row(row_num, field_dict)) raw_dicts.append(field_dict) # Propagate the register's verbatim person_id before dedup so the tree reconciles 1:1 # with canonical-persons.xlsx (#670, Gap 3). _attach_person_ids(persons_raw, raw_dicts) persons, skipped_msgs = _deduplicate(persons_raw) for msg in skipped_msgs: print(f" SKIP {msg}", file=sys.stderr) index = _build_index(persons) # --- Pass 2: resolve relationships --- all_rels: list[dict] = [] all_unresolved: list[dict] = [] spouse_rels, spouse_unres = _resolve_spouses(persons, index) all_rels.extend(spouse_rels) all_unresolved.extend(spouse_unres) for p in persons: bemerkung = p.pop("_bemerkung_raw", None) or "" p.pop("_spouse_raw", None) rels, unres, remaining = _parse_bemerkung(p["rowId"], bemerkung, index) all_rels.extend(rels) all_unresolved.extend(unres) if remaining: existing = p.get("notes") or "" if remaining not in existing: p["notes"] = (existing + " " + remaining).strip() if existing else remaining # --- Stats output --- spouse_count = sum(1 for r in all_rels if r["type"] == "SPOUSE_OF") parent_count = sum(1 for r in all_rels if r["type"] == "PARENT_OF") print(f"✓ {len(persons)} persons parsed") print(f"✓ {len(all_rels)} relationships emitted ({spouse_count} SPOUSE_OF, {parent_count} PARENT_OF)") if all_unresolved: print(f"⚠ {len(all_unresolved)} unresolved (see unresolved[] in output)") if args.dry_run: print("\n--- dry-run: first 5 unresolved ---") for u in all_unresolved[:5]: print(f" {u}") return output = { "generated_at": _GENERATED_AT, "source": Path(args.input).name, "stats": { "persons": len(persons), "relationships": len(all_rels), "unresolved": len(all_unresolved), }, "persons": persons, "relationships": all_rels, "unresolved": all_unresolved, } out_path = Path(args.output) out_path.parent.mkdir(exist_ok=True) out_path.write_text(json.dumps(output, ensure_ascii=False, indent=2), encoding="utf-8") print(f"→ {args.output}") if __name__ == "__main__": main()