Files
familienarchiv/tools/import-normalizer/persons.py
2026-05-25 14:18:09 +02:00

273 lines
10 KiB
Python

"""Person register parsing, name splitting, alias resolution."""
import difflib
import re
import unicodedata
from collections import Counter
from dataclasses import dataclass, field
import config
import dates
_DIACRITIC_MAP = str.maketrans({"ä": "ae", "ö": "oe", "ü": "ue", "ß": "ss",
"Ä": "ae", "Ö": "oe", "Ü": "ue"})
def _strip_accents(s: str) -> str:
s = s.translate(_DIACRITIC_MAP)
s = unicodedata.normalize("NFKD", s)
return "".join(c for c in s if not unicodedata.combining(c))
def slugify(last: str, first: str) -> str:
raw = f"{last} {first}".strip()
raw = _strip_accents(raw).lower()
raw = re.sub(r"[^a-z0-9]+", "-", raw).strip("-")
return raw or "unknown"
@dataclass
class Person:
person_id: str
last_name: str = ""
first_name: str = ""
maiden_name: str = ""
title: str = ""
nickname: str = ""
extra_given_names: list[str] = field(default_factory=list)
birth_date: str | None = None
birth_date_raw: str = ""
birth_place: str = ""
death_date: str | None = None
death_date_raw: str = ""
death_place: str = ""
spouse: str = ""
generation: str = ""
notes: str = ""
aliases: list[str] = field(default_factory=list)
provisional: bool = False
_QUOTED_RE = re.compile(r'^[""\']\s*(.+?)\s*[""\']$')
def parse_register(rows: list[dict]) -> list[Person]:
people = []
for r in rows:
last = (r.get("last_name") or "").strip()
if not last:
continue
given_raw = (r.get("first_name") or "").strip()
givens = [g.strip() for g in given_raw.split(",") if g.strip()]
first = givens[0] if givens else ""
extra = givens[1:]
spouse_raw = (r.get("spouse") or "").strip()
nickname = ""
m = _QUOTED_RE.match(spouse_raw)
if m:
nickname = m.group(1)
spouse_raw = ""
birth = dates.parse_date(r.get("birth_date") or "")
death = dates.parse_date(r.get("death_date") or "")
people.append(Person(
person_id=slugify(last, first),
last_name=last, first_name=first, maiden_name=(r.get("maiden_name") or "").strip(),
nickname=nickname, extra_given_names=extra,
birth_date=birth.iso, birth_date_raw=(r.get("birth_date") or "").strip(), birth_place=(r.get("birth_place") or "").strip(),
death_date=death.iso, death_date_raw=(r.get("death_date") or "").strip(), death_place=(r.get("death_place") or "").strip(),
spouse=spouse_raw, generation=(r.get("generation") or "").strip(),
notes=(r.get("notes") or "").strip(), provisional=False,
))
# De-duplicate colliding ids: every member of a colliding group gets a numeric suffix
# (-1, -2, …) so no id is left as an ambiguous "base". Unique ids are untouched.
counts = Counter(p.person_id for p in people)
seen: dict[str, int] = {}
for p in people:
if counts[p.person_id] > 1:
seen[p.person_id] = seen.get(p.person_id, 0) + 1
p.person_id = f"{p.person_id}-{seen[p.person_id]}"
return people
_GEB_RE = re.compile(r",?\s*geb\.?\s+.+$", re.I)
_PAREN_RE = re.compile(r"\(([^)]+)\)\s*$")
_MULTI_RE = re.compile(r"\s+(?:und|u)\s+", re.I)
def find_known_last_name(segment: str) -> str | None:
seg = segment.strip()
for ln in config.KNOWN_LAST_NAMES: # config lists longest-first
if seg == ln or seg.endswith(" " + ln):
return ln
return None
def split_receivers(raw: str) -> list[str]:
if not raw or not raw.strip():
return []
# 0. split on "//"
if "//" in raw:
out = []
for seg in raw.split("//"):
out.extend(split_receivers(seg))
return out
cleaned = _GEB_RE.sub("", raw).strip()
if not cleaned: # e.g. a "geb. Müller"-only cell strips to empty
return []
if not _MULTI_RE.search(cleaned):
return [cleaned]
shared_last = None
pm = _PAREN_RE.search(cleaned)
if pm:
shared_last = pm.group(1).strip()
cleaned = cleaned[:pm.start()].strip()
parts = [p.strip() for p in _MULTI_RE.split(cleaned)]
parts = [p for p in parts if p and p.lower() != "familie"]
if not parts:
return []
if len(parts) == 1:
return [parts[0]]
if shared_last:
return [p if " " in p else f"{p} {shared_last}" for p in parts]
last_seg = parts[-1]
detected = find_known_last_name(last_seg)
if detected:
result = []
for p in parts[:-1]:
if " " not in p and find_known_last_name(p) is None:
result.append(f"{p} {detected}")
else:
result.append(p)
result.append(last_seg)
return result
return parts
def _norm(name: str) -> str:
return re.sub(r"\s+", " ", _strip_accents(name).lower().replace(".", " ")).strip()
class AliasIndex:
def __init__(self, people: list[Person]):
self._by_alias: dict[str, str] = {}
self._display: dict[str, str] = {}
self.known_ids: set[str] = {p.person_id for p in people}
first_name_ids: dict[str, list] = {}
for p in people:
self._display[p.person_id] = f"{p.first_name} {p.last_name}".strip()
# Ordered, de-duplicated forms (NOT a set) so alias order is deterministic — NFR-IDEM-01.
forms = [f"{p.first_name} {p.last_name}".strip()]
if p.maiden_name:
forms.append(f"{p.first_name} {p.maiden_name}".strip())
for extra in p.extra_given_names:
forms.append(f"{extra} {p.last_name}".strip())
if p.nickname:
forms.append(p.nickname)
seen = set()
for form in forms:
if form in seen:
continue
seen.add(form)
key = _norm(form)
if key and key not in self._by_alias:
self._by_alias[key] = p.person_id
p.aliases.append(form)
if p.first_name:
ids = first_name_ids.setdefault(_norm(p.first_name), [])
if p.person_id not in ids:
ids.append(p.person_id)
# first-name-only alias, only when unambiguous
for fname, ids in first_name_ids.items():
if len(ids) == 1 and fname not in self._by_alias:
self._by_alias[fname] = ids[0]
def resolve(self, name: str):
return self._by_alias.get(_norm(name))
def display(self, person_id: str) -> str:
return self._display.get(person_id, "")
def suggest(self, name: str):
keys = list(self._by_alias.keys())
match = difflib.get_close_matches(_norm(name), keys, n=1, cutoff=config.FUZZY_SUGGEST_THRESHOLD)
if not match:
return None, 0.0
score = difflib.SequenceMatcher(None, _norm(name), match[0]).ratio()
return self._by_alias[match[0]], score
class ResolutionContext:
"""Resolves raw name strings to person ids; accumulates provisional persons and review data."""
def __init__(self, alias_index: AliasIndex, name_overrides: dict[str, str]):
self.index = alias_index
self.name_overrides = name_overrides
self.provisional: dict[str, Person] = {}
self.unmatched: dict[str, list] = {}
self.ambiguous: list[tuple] = []
self._raw_to_pid: dict[str, str] = {}
self.override_hits = 0
def _unique_id(self, base: str) -> str:
"""A provisional id must never collide with a register id or another provisional."""
used = self.index.known_ids | set(self.provisional)
pid, n = base, 1
while pid in used:
n += 1
pid = f"{base}-{n}"
return pid
def resolve_one(self, raw_name: str, source_row: int):
"""Return (person_id, display_name, matched: bool). '' name -> ('', '', True)."""
name = (raw_name or "").strip()
if not name:
return "", "", True
if name in self.name_overrides:
self.override_hits += 1
pid = self.name_overrides[name]
return pid, self.index.display(pid) or name, True
pid = self.index.resolve(name)
if pid:
return pid, self.index.display(pid) or name, True
# provisional person (unmatched) — never reuse a register id
self.unmatched.setdefault(name, []).append(source_row)
if name in self._raw_to_pid:
return self._raw_to_pid[name], name, False
last, first = _last_first(name)
pid = self._unique_id(slugify(last, first))
self.provisional[pid] = Person(person_id=pid, last_name=last, first_name=first, provisional=True)
self._raw_to_pid[name] = pid
return pid, name, False
def resolve_sender(self, raw: str, source_row: int):
"""Senders are split like receivers (REQ-PERS-01). Primary = first part; multi flagged."""
parts = split_receivers(raw)
if not parts:
return "", "", True, False
pid, name, matched = self.resolve_one(parts[0], source_row)
for extra in parts[1:]:
self.resolve_one(extra, source_row) # register the others as persons too
return pid, name, matched, len(parts) > 1
def resolve_receivers(self, raw: str, source_row: int):
results = []
for part in split_receivers(raw):
pid, name, matched = self.resolve_one(part, source_row)
if not matched and " " in part and find_known_last_name(part) is None and len(part.split()) == 2:
self.ambiguous.append((raw, part, source_row))
results.append((pid, name, matched))
return results
def _last_first(name: str):
"""Best-effort split of a free name string into (last, first) for slug/provisional building."""
name = name.strip()
ln = find_known_last_name(name)
if ln:
first = name[: -len(ln)].strip()
return ln, first
tokens = name.split()
if len(tokens) >= 2:
return tokens[-1], " ".join(tokens[:-1])
return name, ""