309 lines
10 KiB
Python
309 lines
10 KiB
Python
"""Normalize Personendatei 2.xlsx into canonical-persons-tree.json."""
|
|
import argparse
|
|
import datetime
|
|
import json
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
import config
|
|
import dates
|
|
from persons import _strip_accents
|
|
|
|
|
|
_MIN_YEAR = 1700
|
|
_MAX_YEAR = 2100
|
|
# Threshold: if parse_date parses a pure-digit string as a year outside [_MIN_YEAR, _MAX_YEAR],
|
|
# but the year is a plausible typo (1000-3000), don't try serial conversion.
|
|
# Years outside this range (e.g., 7568) are implausible and should try serial conversion.
|
|
_PLAUSIBLE_TYPO_MIN = 1000
|
|
_PLAUSIBLE_TYPO_MAX = 3000
|
|
|
|
|
|
def _parse_year(raw: str | None) -> int | None:
|
|
"""Extract a birth/death year from an Excel cell string.
|
|
|
|
Handles three cases:
|
|
1. ISO / German / text string parseable by parse_date() → extract year if in range
|
|
2. Pure-integer string (out-of-range or unparseable) → try Excel serial conversion
|
|
(unless it's a plausible typo year, e.g., "1023" for "1923")
|
|
3. Mixed-format or unresolvable → None
|
|
|
|
Serial conversion only fires for pure-digit strings and implausible years,
|
|
preventing typo years like "1023" from being mis-converted as serials.
|
|
"""
|
|
if raw is None:
|
|
return None
|
|
s = str(raw).strip()
|
|
if not s:
|
|
return None
|
|
|
|
# Check if it's a pure-digit string (candidate for serial conversion)
|
|
is_pure_digit = re.fullmatch(r"\d+", s) is not None
|
|
|
|
# Try parse_date first (handles ISO, DD.MM.YYYY, year-only, month+year, etc.)
|
|
result = dates.parse_date(s)
|
|
if result.iso:
|
|
year = int(result.iso[:4])
|
|
if _MIN_YEAR <= year <= _MAX_YEAR:
|
|
return year
|
|
# Year is out of range. Only try serial conversion if it's an implausible year.
|
|
# Plausible typos (e.g., 1023 for 1923) should not be converted as serials.
|
|
if is_pure_digit and not (_PLAUSIBLE_TYPO_MIN <= year <= _PLAUSIBLE_TYPO_MAX):
|
|
n = int(s)
|
|
if 1 <= n <= 80_000:
|
|
d = datetime.date(1899, 12, 30) + datetime.timedelta(days=n)
|
|
if _MIN_YEAR <= d.year <= _MAX_YEAR:
|
|
return d.year
|
|
return None
|
|
|
|
# parse_date() found nothing. Try serial conversion only for pure-digit strings.
|
|
if is_pure_digit:
|
|
n = int(s)
|
|
if 1 <= n <= 80_000:
|
|
d = datetime.date(1899, 12, 30) + datetime.timedelta(days=n)
|
|
if _MIN_YEAR <= d.year <= _MAX_YEAR:
|
|
return d.year
|
|
|
|
return None
|
|
|
|
|
|
def _parse_generation(raw: str | None) -> int | None:
|
|
"""Extract the generation integer from column A values like 'G 3', 'G3', 'G 0'."""
|
|
if not raw:
|
|
return None
|
|
m = re.search(r"\d+", str(raw))
|
|
return int(m.group()) if m else None
|
|
|
|
|
|
_GEO_SUFFIXES = {"aachen", "mex", "mexiko", "sen", "jun", "jr"}
|
|
|
|
|
|
def _norm_tree(s: str) -> str:
|
|
"""Normalize a name string for tree matching.
|
|
|
|
- Strip surrounding quotes, remove parenthetical substrings
|
|
- Diacritic → ASCII (ä→ae etc.), lowercase, dots → spaces
|
|
- Remove known geographic/honorific suffix tokens
|
|
- Collapse whitespace
|
|
"""
|
|
s = (s or "").strip().strip("\"'")
|
|
s = re.sub(r"\([^)]*\)", "", s)
|
|
s = _strip_accents(s).lower().replace(".", " ")
|
|
tokens = [t for t in s.split() if t and t not in _GEO_SUFFIXES]
|
|
return " ".join(tokens).strip("., ")
|
|
|
|
|
|
def _build_index(persons: list[dict]) -> dict[str, list[str]]:
|
|
"""Build a name → [rowId, …] lookup index with four keys per person."""
|
|
index: dict[str, list[str]] = {}
|
|
|
|
def _add(key: str, row_id: str) -> None:
|
|
if key:
|
|
index.setdefault(key, []).append(row_id)
|
|
|
|
for p in persons:
|
|
row_id = p["rowId"]
|
|
first = p.get("firstName") or ""
|
|
last = p.get("lastName") or ""
|
|
maiden = p.get("maidenName") or ""
|
|
|
|
_add(_norm_tree(f"{first} {last}"), row_id)
|
|
_add(_norm_tree(f"{last} {first}"), row_id)
|
|
if maiden:
|
|
_add(_norm_tree(f"{first} {maiden}"), row_id)
|
|
_add(_norm_tree(last), row_id)
|
|
_add(_norm_tree(first), row_id)
|
|
|
|
return index
|
|
|
|
|
|
def _resolve_one(raw: str, index: dict[str, list[str]]) -> tuple[str | None, str | None]:
|
|
"""Return (row_id, None) on unique match, (None, reason) otherwise."""
|
|
key = _norm_tree(raw)
|
|
if not key:
|
|
return None, "empty"
|
|
hits = index.get(key, [])
|
|
if len(hits) == 1:
|
|
return hits[0], None
|
|
if len(hits) == 0:
|
|
return None, "not_found"
|
|
return None, "ambiguous"
|
|
|
|
|
|
def _parse_row(row_num: int, fields: dict) -> dict:
|
|
"""Produce one person record from a header-mapped row dict.
|
|
|
|
Internal keys prefixed with '_' are stripped before JSON output in main().
|
|
"""
|
|
def s(key: str) -> str:
|
|
return (fields.get(key) or "").strip()
|
|
|
|
birth_raw = s("birth_date")
|
|
death_raw = s("death_date")
|
|
|
|
birth_year = _parse_year(birth_raw)
|
|
death_year = _parse_year(death_raw)
|
|
|
|
notes_parts = []
|
|
if birth_raw and birth_year is None:
|
|
notes_parts.append(f"[Geburtsdatum: {birth_raw}]")
|
|
if death_raw and death_year is None:
|
|
notes_parts.append(f"[Todesdatum: {death_raw}]")
|
|
bemerkung = s("notes")
|
|
if bemerkung:
|
|
notes_parts.append(bemerkung)
|
|
|
|
maiden = s("maiden_name") or None
|
|
spouse = s("spouse") or None
|
|
bemerkung_out = bemerkung or None
|
|
|
|
return {
|
|
"rowId": f"row_{row_num:03d}",
|
|
"firstName": s("first_name"),
|
|
"lastName": s("last_name"),
|
|
"maidenName": maiden,
|
|
"alias": None,
|
|
"notes": " ".join(notes_parts) or None,
|
|
"birthYear": birth_year,
|
|
"deathYear": death_year,
|
|
"birthPlace": s("birth_place") or None,
|
|
"deathPlace": s("death_place") or None,
|
|
"generation": _parse_generation(s("generation")),
|
|
"familyMember": True,
|
|
"_spouse_raw": spouse,
|
|
"_bemerkung_raw": bemerkung_out,
|
|
}
|
|
|
|
|
|
def _deduplicate(persons: list[dict]) -> tuple[list[dict], list[str]]:
|
|
"""Remove duplicate rows. Two-stage:
|
|
|
|
1. Exact (firstName, lastName, birthYear) match.
|
|
2. (firstName, lastName) where the later entry has birthYear=None and an earlier
|
|
entry already has a known birthYear.
|
|
"""
|
|
seen_full: dict[tuple, str] = {} # (first, last, year) -> rowId
|
|
seen_name: dict[tuple, str] = {} # (first, last) -> rowId of first entry with a year
|
|
result: list[dict] = []
|
|
skipped: list[str] = []
|
|
|
|
for p in persons:
|
|
first, last, year = p["firstName"], p["lastName"], p["birthYear"]
|
|
key_full = (first, last, year)
|
|
key_name = (first, last)
|
|
|
|
if key_full in seen_full:
|
|
skipped.append(f"{p['rowId']} duplicates {seen_full[key_full]} ({first} {last}, year={year})")
|
|
continue
|
|
|
|
if year is None and key_name in seen_name:
|
|
skipped.append(f"{p['rowId']} duplicates {seen_name[key_name]} ({first} {last}, no birth year)")
|
|
continue
|
|
|
|
seen_full[key_full] = p["rowId"]
|
|
if year is not None:
|
|
seen_name[key_name] = p["rowId"]
|
|
|
|
result.append(p)
|
|
|
|
return result, skipped
|
|
|
|
|
|
def _resolve_spouses(
|
|
persons: list[dict], index: dict[str, list[str]]
|
|
) -> tuple[list[dict], list[dict]]:
|
|
"""Emit SPOUSE_OF edges from each person's _spouse_raw field."""
|
|
relationships: list[dict] = []
|
|
unresolved: list[dict] = []
|
|
emitted: set[frozenset] = set()
|
|
|
|
for p in persons:
|
|
raw = (p.get("_spouse_raw") or "").strip()
|
|
if not raw:
|
|
continue
|
|
row_id = p["rowId"]
|
|
matched_id, reason = _resolve_one(raw, index)
|
|
if matched_id:
|
|
edge = frozenset([row_id, matched_id])
|
|
if edge not in emitted:
|
|
emitted.add(edge)
|
|
relationships.append({
|
|
"personId": row_id,
|
|
"relatedPersonId": matched_id,
|
|
"type": "SPOUSE_OF",
|
|
"source": "verheiratet_mit",
|
|
})
|
|
else:
|
|
unresolved.append({
|
|
"rowId": row_id,
|
|
"field": "verheiratet_mit",
|
|
"raw": raw,
|
|
"reason": reason,
|
|
})
|
|
|
|
return relationships, unresolved
|
|
|
|
|
|
_CHILD_RE = re.compile(r"^(?:Sohn|Tochter)\s+v(?:on)?\s+(.+)", re.I)
|
|
_PARENT_RE = re.compile(r"^(?:Vater|Mutter)\s+v(?:on)?\s+(.+)", re.I)
|
|
_AND_RE = re.compile(r"\s+u(?:nd)?\s+", re.I)
|
|
|
|
|
|
def _parse_bemerkung(
|
|
row_id: str, bemerkung: str, index: dict[str, list[str]]
|
|
) -> tuple[list[dict], list[dict], str]:
|
|
"""Extract PARENT_OF edges from a Bemerkung cell.
|
|
|
|
Returns (relationships, unresolved, remaining_notes).
|
|
Text that doesn't match a parent pattern goes to remaining_notes unchanged.
|
|
"""
|
|
if not bemerkung or not bemerkung.strip():
|
|
return [], [], ""
|
|
|
|
s = bemerkung.strip()
|
|
|
|
for pattern, direction in ((_CHILD_RE, "child"), (_PARENT_RE, "parent")):
|
|
m = pattern.match(s)
|
|
if not m:
|
|
continue
|
|
|
|
name_part = m.group(1).strip().rstrip("!., ")
|
|
parts = [p.strip() for p in _AND_RE.split(name_part) if p.strip()]
|
|
rels: list[dict] = []
|
|
unres: list[dict] = []
|
|
|
|
for part in parts:
|
|
part = part.rstrip("!., ")
|
|
matched_id, reason = _resolve_one(part, index)
|
|
if matched_id:
|
|
if direction == "child":
|
|
rels.append({
|
|
"personId": matched_id,
|
|
"relatedPersonId": row_id,
|
|
"type": "PARENT_OF",
|
|
"source": "bemerkung",
|
|
"rawBemerkung": bemerkung,
|
|
})
|
|
else:
|
|
rels.append({
|
|
"personId": row_id,
|
|
"relatedPersonId": matched_id,
|
|
"type": "PARENT_OF",
|
|
"source": "bemerkung",
|
|
"rawBemerkung": bemerkung,
|
|
})
|
|
else:
|
|
unres.append({
|
|
"rowId": row_id,
|
|
"field": "bemerkung",
|
|
"raw": bemerkung,
|
|
"reason": reason,
|
|
})
|
|
|
|
remainder = s[m.end():].strip().lstrip(".,! ")
|
|
return rels, unres, remainder
|
|
|
|
# No pattern matched — full text goes to notes, nothing to unresolved
|
|
return [], [], s
|