Files
familienarchiv/tools/import-normalizer/persons.py
Marcel a177077b40 feat(normalizer): receiver splitting
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-25 13:59:51 +02:00

142 lines
4.6 KiB
Python

"""Person register parsing, name splitting, alias resolution."""
import re
import unicodedata
from collections import Counter
from dataclasses import dataclass, field
import config
import dates
_DIACRITIC_MAP = str.maketrans({"ä": "ae", "ö": "oe", "ü": "ue", "ß": "ss",
"Ä": "ae", "Ö": "oe", "Ü": "ue"})
def _strip_accents(s: str) -> str:
s = s.translate(_DIACRITIC_MAP)
s = unicodedata.normalize("NFKD", s)
return "".join(c for c in s if not unicodedata.combining(c))
def slugify(last: str, first: str) -> str:
raw = f"{last} {first}".strip()
raw = _strip_accents(raw).lower()
raw = re.sub(r"[^a-z0-9]+", "-", raw).strip("-")
return raw or "unknown"
@dataclass
class Person:
person_id: str
last_name: str = ""
first_name: str = ""
maiden_name: str = ""
title: str = ""
nickname: str = ""
extra_given_names: list[str] = field(default_factory=list)
birth_date: str | None = None
birth_date_raw: str = ""
birth_place: str = ""
death_date: str | None = None
death_date_raw: str = ""
death_place: str = ""
spouse: str = ""
generation: str = ""
notes: str = ""
aliases: list[str] = field(default_factory=list)
provisional: bool = False
_QUOTED_RE = re.compile(r'^[""\']\s*(.+?)\s*[""\']$')
def parse_register(rows: list[dict]) -> list[Person]:
people = []
for r in rows:
last = (r.get("last_name") or "").strip()
if not last:
continue
given_raw = (r.get("first_name") or "").strip()
givens = [g.strip() for g in given_raw.split(",") if g.strip()]
first = givens[0] if givens else ""
extra = givens[1:]
spouse_raw = (r.get("spouse") or "").strip()
nickname = ""
m = _QUOTED_RE.match(spouse_raw)
if m:
nickname = m.group(1)
spouse_raw = ""
birth = dates.parse_date(r.get("birth_date") or "")
death = dates.parse_date(r.get("death_date") or "")
people.append(Person(
person_id=slugify(last, first),
last_name=last, first_name=first, maiden_name=(r.get("maiden_name") or "").strip(),
nickname=nickname, extra_given_names=extra,
birth_date=birth.iso, birth_date_raw=(r.get("birth_date") or "").strip(), birth_place=(r.get("birth_place") or "").strip(),
death_date=death.iso, death_date_raw=(r.get("death_date") or "").strip(), death_place=(r.get("death_place") or "").strip(),
spouse=spouse_raw, generation=(r.get("generation") or "").strip(),
notes=(r.get("notes") or "").strip(), provisional=False,
))
# De-duplicate colliding ids: every member of a colliding group gets a numeric suffix
# (-1, -2, …) so no id is left as an ambiguous "base". Unique ids are untouched.
counts = Counter(p.person_id for p in people)
seen: dict[str, int] = {}
for p in people:
if counts[p.person_id] > 1:
seen[p.person_id] = seen.get(p.person_id, 0) + 1
p.person_id = f"{p.person_id}-{seen[p.person_id]}"
return people
_GEB_RE = re.compile(r",?\s*geb\.?\s+.+$", re.I)
_PAREN_RE = re.compile(r"\(([^)]+)\)\s*$")
_MULTI_RE = re.compile(r"\s+(?:und|u)\s+", re.I)
def find_known_last_name(segment: str):
seg = segment.strip()
for ln in config.KNOWN_LAST_NAMES: # config lists longest-first
if seg == ln or seg.endswith(" " + ln):
return ln
return None
def split_receivers(raw: str) -> list[str]:
if not raw or not raw.strip():
return []
# 0. split on "//"
if "//" in raw:
out = []
for seg in raw.split("//"):
out.extend(split_receivers(seg))
return out
cleaned = _GEB_RE.sub("", raw).strip()
if not _MULTI_RE.search(cleaned):
return [cleaned]
shared_last = None
pm = _PAREN_RE.search(cleaned)
if pm:
shared_last = pm.group(1).strip()
cleaned = cleaned[:pm.start()].strip()
parts = [p.strip() for p in _MULTI_RE.split(cleaned)]
parts = [p for p in parts if p and p.lower() != "familie"]
if not parts:
return []
if len(parts) == 1:
return [parts[0]]
if shared_last:
return [p if " " in p else f"{p} {shared_last}" for p in parts]
last_seg = parts[-1]
detected = find_known_last_name(last_seg)
if detected:
result = []
for p in parts[:-1]:
if " " not in p and find_known_last_name(p) is None:
result.append(f"{p} {detected}")
else:
result.append(p)
result.append(last_seg)
return result
return parts