Files
familienarchiv/tools/import-normalizer/persons_tree.py
Marcel e326630318 feat(normalizer): add main() CLI to persons_tree
Wires the two-pass pipeline (parse → deduplicate → index → resolve)
into a runnable CLI with --input, --output, and --dry-run flags.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 21:16:21 +02:00

410 lines
14 KiB
Python

"""Normalize Personendatei 2.xlsx into canonical-persons-tree.json."""
import argparse
import datetime
import json
import re
import sys
from pathlib import Path
import config
import dates
from persons import _strip_accents
_MIN_YEAR = 1700
_MAX_YEAR = 2100
# Threshold: if parse_date parses a pure-digit string as a year outside [_MIN_YEAR, _MAX_YEAR],
# but the year is a plausible typo (1000-3000), don't try serial conversion.
# Years outside this range (e.g., 7568) are implausible and should try serial conversion.
_PLAUSIBLE_TYPO_MIN = 1000
_PLAUSIBLE_TYPO_MAX = 3000
def _parse_year(raw: str | None) -> int | None:
"""Extract a birth/death year from an Excel cell string.
Handles three cases:
1. ISO / German / text string parseable by parse_date() → extract year if in range
2. Pure-integer string (out-of-range or unparseable) → try Excel serial conversion
(unless it's a plausible typo year, e.g., "1023" for "1923")
3. Mixed-format or unresolvable → None
Serial conversion only fires for pure-digit strings and implausible years,
preventing typo years like "1023" from being mis-converted as serials.
"""
if raw is None:
return None
s = str(raw).strip()
if not s:
return None
# Check if it's a pure-digit string (candidate for serial conversion)
is_pure_digit = re.fullmatch(r"\d+", s) is not None
# Try parse_date first (handles ISO, DD.MM.YYYY, year-only, month+year, etc.)
result = dates.parse_date(s)
if result.iso:
year = int(result.iso[:4])
if _MIN_YEAR <= year <= _MAX_YEAR:
return year
# Year is out of range. Only try serial conversion if it's an implausible year.
# Plausible typos (e.g., 1023 for 1923) should not be converted as serials.
if is_pure_digit and not (_PLAUSIBLE_TYPO_MIN <= year <= _PLAUSIBLE_TYPO_MAX):
n = int(s)
if 1 <= n <= 80_000:
d = datetime.date(1899, 12, 30) + datetime.timedelta(days=n)
if _MIN_YEAR <= d.year <= _MAX_YEAR:
return d.year
return None
# parse_date() found nothing. Try serial conversion only for pure-digit strings.
if is_pure_digit:
n = int(s)
if 1 <= n <= 80_000:
d = datetime.date(1899, 12, 30) + datetime.timedelta(days=n)
if _MIN_YEAR <= d.year <= _MAX_YEAR:
return d.year
return None
def _parse_generation(raw: str | None) -> int | None:
"""Extract the generation integer from column A values like 'G 3', 'G3', 'G 0'."""
if not raw:
return None
m = re.search(r"\d+", str(raw))
return int(m.group()) if m else None
_GEO_SUFFIXES = {"aachen", "mex", "mexiko", "sen", "jun", "jr"}
def _norm_tree(s: str) -> str:
"""Normalize a name string for tree matching.
- Strip surrounding quotes, remove parenthetical substrings
- Diacritic → ASCII (ä→ae etc.), lowercase, dots → spaces
- Remove known geographic/honorific suffix tokens
- Collapse whitespace
"""
s = (s or "").strip().strip("\"'")
s = re.sub(r"\([^)]*\)", "", s)
s = _strip_accents(s).lower().replace(".", " ")
tokens = [t for t in s.split() if t and t not in _GEO_SUFFIXES]
return " ".join(tokens).strip("., ")
def _build_index(persons: list[dict]) -> dict[str, list[str]]:
"""Build a name → [rowId, …] lookup index with four keys per person."""
index: dict[str, list[str]] = {}
def _add(key: str, row_id: str) -> None:
if key:
index.setdefault(key, []).append(row_id)
for p in persons:
row_id = p["rowId"]
first = p.get("firstName") or ""
last = p.get("lastName") or ""
maiden = p.get("maidenName") or ""
_add(_norm_tree(f"{first} {last}"), row_id)
_add(_norm_tree(f"{last} {first}"), row_id)
if maiden:
_add(_norm_tree(f"{first} {maiden}"), row_id)
_add(_norm_tree(last), row_id)
return index
def _resolve_one(raw: str, index: dict[str, list[str]]) -> tuple[str | None, str | None]:
"""Return (row_id, None) on unique match, (None, reason) otherwise."""
key = _norm_tree(raw)
if not key:
return None, "empty"
hits = index.get(key, [])
if len(hits) == 1:
return hits[0], None
if len(hits) == 0:
return None, "not_found"
return None, "ambiguous"
def _parse_row(row_num: int, fields: dict) -> dict:
"""Produce one person record from a header-mapped row dict.
Internal keys prefixed with '_' are stripped before JSON output in main().
"""
def s(key: str) -> str:
return (fields.get(key) or "").strip()
birth_raw = s("birth_date")
death_raw = s("death_date")
birth_year = _parse_year(birth_raw)
death_year = _parse_year(death_raw)
notes_parts = []
if birth_raw and birth_year is None:
notes_parts.append(f"[Geburtsdatum: {birth_raw}]")
if death_raw and death_year is None:
notes_parts.append(f"[Todesdatum: {death_raw}]")
bemerkung = s("notes")
if bemerkung:
notes_parts.append(bemerkung)
maiden = s("maiden_name") or None
spouse = s("spouse") or None
bemerkung_out = bemerkung or None
return {
"rowId": f"row_{row_num:03d}",
"firstName": s("first_name"),
"lastName": s("last_name"),
"maidenName": maiden,
"alias": None,
"notes": " ".join(notes_parts) or None,
"birthYear": birth_year,
"deathYear": death_year,
"birthPlace": s("birth_place") or None,
"deathPlace": s("death_place") or None,
"generation": _parse_generation(s("generation")),
"familyMember": True,
"_spouse_raw": spouse,
"_bemerkung_raw": bemerkung_out,
}
def _deduplicate(persons: list[dict]) -> tuple[list[dict], list[str]]:
"""Remove duplicate rows. Two-stage:
1. Exact (firstName, lastName, birthYear) match.
2. (firstName, lastName) where the later entry has birthYear=None and an earlier
entry already has a known birthYear.
"""
seen_full: dict[tuple, str] = {} # (first, last, year) -> rowId
seen_name: dict[tuple, str] = {} # (first, last) -> rowId of first entry with a year
result: list[dict] = []
skipped: list[str] = []
for p in persons:
first, last, year = p["firstName"], p["lastName"], p["birthYear"]
key_full = (first, last, year)
key_name = (first, last)
if key_full in seen_full:
skipped.append(f"{p['rowId']} duplicates {seen_full[key_full]} ({first} {last}, year={year})")
continue
if year is None and key_name in seen_name:
skipped.append(f"{p['rowId']} duplicates {seen_name[key_name]} ({first} {last}, no birth year)")
continue
seen_full[key_full] = p["rowId"]
if year is not None:
seen_name[key_name] = p["rowId"]
result.append(p)
return result, skipped
def _resolve_spouses(
persons: list[dict], index: dict[str, list[str]]
) -> tuple[list[dict], list[dict]]:
"""Emit SPOUSE_OF edges from each person's _spouse_raw field."""
relationships: list[dict] = []
unresolved: list[dict] = []
emitted: set[frozenset] = set()
for p in persons:
raw = (p.get("_spouse_raw") or "").strip()
if not raw:
continue
row_id = p["rowId"]
matched_id, reason = _resolve_one(raw, index)
if matched_id:
edge = frozenset([row_id, matched_id])
if edge not in emitted:
emitted.add(edge)
relationships.append({
"personId": row_id,
"relatedPersonId": matched_id,
"type": "SPOUSE_OF",
"source": "verheiratet_mit",
})
else:
unresolved.append({
"rowId": row_id,
"field": "verheiratet_mit",
"raw": raw,
"reason": reason,
})
return relationships, unresolved
_CHILD_RE = re.compile(r"^(?:Sohn|Tochter)\s+v(?:on)?\s+(.+)", re.I)
_PARENT_RE = re.compile(r"^(?:Vater|Mutter)\s+v(?:on)?\s+(.+)", re.I)
_AND_RE = re.compile(r"\s+u(?:nd)?\s+", re.I)
def _parse_bemerkung(
row_id: str, bemerkung: str, index: dict[str, list[str]]
) -> tuple[list[dict], list[dict], str]:
"""Extract PARENT_OF edges from a Bemerkung cell.
Returns (relationships, unresolved, remaining_notes).
Text that doesn't match a parent pattern goes to remaining_notes unchanged.
"""
if not bemerkung or not bemerkung.strip():
return [], [], ""
s = bemerkung.strip()
for pattern, direction in ((_CHILD_RE, "child"), (_PARENT_RE, "parent")):
m = pattern.match(s)
if not m:
continue
# Split the captured group on the first comma or semicolon to separate
# the name part from any trailing description (e.g. ", nach Mexiko emigriert")
raw_names, _, trailing = m.group(1).strip().partition(",")
if not trailing:
raw_names, _, trailing = raw_names.partition(";")
name_part = raw_names.strip().rstrip("!., ")
remainder = trailing.strip().lstrip(".,! ")
parts = [p.strip() for p in _AND_RE.split(name_part) if p.strip()]
rels: list[dict] = []
unres: list[dict] = []
for part in parts:
part = part.rstrip("!., ")
matched_id, reason = _resolve_one(part, index)
if matched_id:
if direction == "child":
rels.append({
"personId": matched_id,
"relatedPersonId": row_id,
"type": "PARENT_OF",
"source": "bemerkung",
"rawBemerkung": bemerkung,
})
else:
rels.append({
"personId": row_id,
"relatedPersonId": matched_id,
"type": "PARENT_OF",
"source": "bemerkung",
"rawBemerkung": bemerkung,
})
else:
unres.append({
"rowId": row_id,
"field": "bemerkung",
"raw": bemerkung,
"reason": reason,
})
return rels, unres, remainder
# No pattern matched — full text goes to notes, nothing to unresolved
return [], [], s
def main() -> None:
parser = argparse.ArgumentParser(
description="Normalize Personendatei 2.xlsx → canonical-persons-tree.json"
)
parser.add_argument(
"--input", default=str(config.PERSON_WORKBOOK),
help="Path to Personendatei 2.xlsx"
)
parser.add_argument(
"--output", default=str(config.OUT_DIR / "canonical-persons-tree.json"),
help="Path for output JSON"
)
parser.add_argument("--dry-run", action="store_true", help="Print stats, skip write")
args = parser.parse_args()
from ingest import read_sheet, build_header_map
rows = read_sheet(Path(args.input), config.PERSON_SHEET)
if not rows:
print("ERROR: sheet is empty", file=sys.stderr)
sys.exit(1)
header_row = [str(v) for v in rows[0]]
fields_map, _ = build_header_map(header_row, config.PERSON_HEADER_MAP, config.PERSON_REQUIRED_FIELDS)
# --- Pass 1: parse rows ---
persons_raw: list[dict] = []
for row_num, row in enumerate(rows[1:], start=2):
field_dict = {field: (row[col] if col < len(row) else "") for field, col in fields_map.items()}
if not field_dict.get("last_name", "").strip():
continue
persons_raw.append(_parse_row(row_num, field_dict))
persons, skipped_msgs = _deduplicate(persons_raw)
for msg in skipped_msgs:
print(f" SKIP {msg}", file=sys.stderr)
index = _build_index(persons)
# --- Pass 2: resolve relationships ---
all_rels: list[dict] = []
all_unresolved: list[dict] = []
spouse_rels, spouse_unres = _resolve_spouses(persons, index)
all_rels.extend(spouse_rels)
all_unresolved.extend(spouse_unres)
for p in persons:
bemerkung = p.pop("_bemerkung_raw", None) or ""
p.pop("_spouse_raw", None)
rels, unres, remaining = _parse_bemerkung(p["rowId"], bemerkung, index)
all_rels.extend(rels)
all_unresolved.extend(unres)
if remaining:
existing = p.get("notes") or ""
if remaining not in existing:
p["notes"] = (existing + " " + remaining).strip() if existing else remaining
# --- Stats output ---
spouse_count = sum(1 for r in all_rels if r["type"] == "SPOUSE_OF")
parent_count = sum(1 for r in all_rels if r["type"] == "PARENT_OF")
print(f"{len(persons)} persons parsed")
print(f"{len(all_rels)} relationships emitted ({spouse_count} SPOUSE_OF, {parent_count} PARENT_OF)")
if all_unresolved:
print(f"{len(all_unresolved)} unresolved (see unresolved[] in output)")
if args.dry_run:
print("\n--- dry-run: first 5 unresolved ---")
for u in all_unresolved[:5]:
print(f" {u}")
return
output = {
"generated_at": datetime.datetime.now().isoformat(),
"source": Path(args.input).name,
"stats": {
"persons": len(persons),
"relationships": len(all_rels),
"unresolved": len(all_unresolved),
},
"persons": persons,
"relationships": all_rels,
"unresolved": all_unresolved,
}
out_path = Path(args.output)
out_path.parent.mkdir(exist_ok=True)
out_path.write_text(json.dumps(output, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"{args.output}")
if __name__ == "__main__":
main()