Files
familienarchiv/tools/import-normalizer/persons.py
2026-05-25 14:04:11 +02:00

198 lines
6.9 KiB
Python

"""Person register parsing, name splitting, alias resolution."""
import difflib
import re
import unicodedata
from collections import Counter
from dataclasses import dataclass, field
import config
import dates
_DIACRITIC_MAP = str.maketrans({"ä": "ae", "ö": "oe", "ü": "ue", "ß": "ss",
"Ä": "ae", "Ö": "oe", "Ü": "ue"})
def _strip_accents(s: str) -> str:
s = s.translate(_DIACRITIC_MAP)
s = unicodedata.normalize("NFKD", s)
return "".join(c for c in s if not unicodedata.combining(c))
def slugify(last: str, first: str) -> str:
raw = f"{last} {first}".strip()
raw = _strip_accents(raw).lower()
raw = re.sub(r"[^a-z0-9]+", "-", raw).strip("-")
return raw or "unknown"
@dataclass
class Person:
person_id: str
last_name: str = ""
first_name: str = ""
maiden_name: str = ""
title: str = ""
nickname: str = ""
extra_given_names: list[str] = field(default_factory=list)
birth_date: str | None = None
birth_date_raw: str = ""
birth_place: str = ""
death_date: str | None = None
death_date_raw: str = ""
death_place: str = ""
spouse: str = ""
generation: str = ""
notes: str = ""
aliases: list[str] = field(default_factory=list)
provisional: bool = False
_QUOTED_RE = re.compile(r'^[""\']\s*(.+?)\s*[""\']$')
def parse_register(rows: list[dict]) -> list[Person]:
people = []
for r in rows:
last = (r.get("last_name") or "").strip()
if not last:
continue
given_raw = (r.get("first_name") or "").strip()
givens = [g.strip() for g in given_raw.split(",") if g.strip()]
first = givens[0] if givens else ""
extra = givens[1:]
spouse_raw = (r.get("spouse") or "").strip()
nickname = ""
m = _QUOTED_RE.match(spouse_raw)
if m:
nickname = m.group(1)
spouse_raw = ""
birth = dates.parse_date(r.get("birth_date") or "")
death = dates.parse_date(r.get("death_date") or "")
people.append(Person(
person_id=slugify(last, first),
last_name=last, first_name=first, maiden_name=(r.get("maiden_name") or "").strip(),
nickname=nickname, extra_given_names=extra,
birth_date=birth.iso, birth_date_raw=(r.get("birth_date") or "").strip(), birth_place=(r.get("birth_place") or "").strip(),
death_date=death.iso, death_date_raw=(r.get("death_date") or "").strip(), death_place=(r.get("death_place") or "").strip(),
spouse=spouse_raw, generation=(r.get("generation") or "").strip(),
notes=(r.get("notes") or "").strip(), provisional=False,
))
# De-duplicate colliding ids: every member of a colliding group gets a numeric suffix
# (-1, -2, …) so no id is left as an ambiguous "base". Unique ids are untouched.
counts = Counter(p.person_id for p in people)
seen: dict[str, int] = {}
for p in people:
if counts[p.person_id] > 1:
seen[p.person_id] = seen.get(p.person_id, 0) + 1
p.person_id = f"{p.person_id}-{seen[p.person_id]}"
return people
_GEB_RE = re.compile(r",?\s*geb\.?\s+.+$", re.I)
_PAREN_RE = re.compile(r"\(([^)]+)\)\s*$")
_MULTI_RE = re.compile(r"\s+(?:und|u)\s+", re.I)
def find_known_last_name(segment: str) -> str | None:
seg = segment.strip()
for ln in config.KNOWN_LAST_NAMES: # config lists longest-first
if seg == ln or seg.endswith(" " + ln):
return ln
return None
def split_receivers(raw: str) -> list[str]:
if not raw or not raw.strip():
return []
# 0. split on "//"
if "//" in raw:
out = []
for seg in raw.split("//"):
out.extend(split_receivers(seg))
return out
cleaned = _GEB_RE.sub("", raw).strip()
if not cleaned: # e.g. a "geb. Müller"-only cell strips to empty
return []
if not _MULTI_RE.search(cleaned):
return [cleaned]
shared_last = None
pm = _PAREN_RE.search(cleaned)
if pm:
shared_last = pm.group(1).strip()
cleaned = cleaned[:pm.start()].strip()
parts = [p.strip() for p in _MULTI_RE.split(cleaned)]
parts = [p for p in parts if p and p.lower() != "familie"]
if not parts:
return []
if len(parts) == 1:
return [parts[0]]
if shared_last:
return [p if " " in p else f"{p} {shared_last}" for p in parts]
last_seg = parts[-1]
detected = find_known_last_name(last_seg)
if detected:
result = []
for p in parts[:-1]:
if " " not in p and find_known_last_name(p) is None:
result.append(f"{p} {detected}")
else:
result.append(p)
result.append(last_seg)
return result
return parts
def _norm(name: str) -> str:
return re.sub(r"\s+", " ", _strip_accents(name).lower().replace(".", " ")).strip()
class AliasIndex:
def __init__(self, people: list[Person]):
self._by_alias: dict[str, str] = {}
self._display: dict[str, str] = {}
self.known_ids: set[str] = {p.person_id for p in people}
first_name_ids: dict[str, list] = {}
for p in people:
self._display[p.person_id] = f"{p.first_name} {p.last_name}".strip()
# Ordered, de-duplicated forms (NOT a set) so alias order is deterministic — NFR-IDEM-01.
forms = [f"{p.first_name} {p.last_name}".strip()]
if p.maiden_name:
forms.append(f"{p.first_name} {p.maiden_name}".strip())
for extra in p.extra_given_names:
forms.append(f"{extra} {p.last_name}".strip())
if p.nickname:
forms.append(p.nickname)
seen = set()
for form in forms:
if form in seen:
continue
seen.add(form)
key = _norm(form)
if key and key not in self._by_alias:
self._by_alias[key] = p.person_id
p.aliases.append(form)
if p.first_name:
ids = first_name_ids.setdefault(_norm(p.first_name), [])
if p.person_id not in ids:
ids.append(p.person_id)
# first-name-only alias, only when unambiguous
for fname, ids in first_name_ids.items():
if len(ids) == 1 and fname not in self._by_alias:
self._by_alias[fname] = ids[0]
def resolve(self, name: str):
return self._by_alias.get(_norm(name))
def display(self, person_id: str) -> str:
return self._display.get(person_id, "")
def suggest(self, name: str):
keys = list(self._by_alias.keys())
match = difflib.get_close_matches(_norm(name), keys, n=1, cutoff=config.FUZZY_SUGGEST_THRESHOLD)
if not match:
return None, 0.0
score = difflib.SequenceMatcher(None, _norm(name), match[0]).ratio()
return self._by_alias[match[0]], score