"""Person register parsing, name splitting, alias resolution.""" import difflib import re import unicodedata from collections import Counter from dataclasses import dataclass, field from enum import StrEnum import config import dates _DIACRITIC_MAP = str.maketrans({"ä": "ae", "ö": "oe", "ü": "ue", "ß": "ss", "Ä": "ae", "Ö": "oe", "Ü": "ue"}) def _strip_accents(s: str) -> str: s = s.translate(_DIACRITIC_MAP) s = unicodedata.normalize("NFKD", s) return "".join(c for c in s if not unicodedata.combining(c)) def slugify(last: str, first: str) -> str: raw = f"{last} {first}".strip() raw = _strip_accents(raw).lower() raw = re.sub(r"[^a-z0-9]+", "-", raw).strip("-") return raw or "unknown" @dataclass class Person: person_id: str last_name: str = "" first_name: str = "" maiden_name: str = "" title: str = "" nickname: str = "" extra_given_names: list[str] = field(default_factory=list) birth_date: str | None = None birth_date_raw: str = "" birth_place: str = "" death_date: str | None = None death_date_raw: str = "" death_place: str = "" spouse: str = "" generation: str = "" notes: str = "" aliases: list[str] = field(default_factory=list) provisional: bool = False _QUOTED_RE = re.compile(r'^[""\']\s*(.+?)\s*[""\']$') def parse_register(rows: list[dict]) -> list[Person]: people = [] for r in rows: last = (r.get("last_name") or "").strip() if not last: continue given_raw = (r.get("first_name") or "").strip() givens = [g.strip() for g in given_raw.split(",") if g.strip()] first = givens[0] if givens else "" extra = givens[1:] spouse_raw = (r.get("spouse") or "").strip() nickname = "" m = _QUOTED_RE.match(spouse_raw) if m: nickname = m.group(1) spouse_raw = "" birth = dates.parse_date(r.get("birth_date") or "") death = dates.parse_date(r.get("death_date") or "") people.append(Person( person_id=slugify(last, first), last_name=last, first_name=first, maiden_name=(r.get("maiden_name") or "").strip(), nickname=nickname, extra_given_names=extra, birth_date=birth.iso, birth_date_raw=(r.get("birth_date") or "").strip(), birth_place=(r.get("birth_place") or "").strip(), death_date=death.iso, death_date_raw=(r.get("death_date") or "").strip(), death_place=(r.get("death_place") or "").strip(), spouse=spouse_raw, generation=(r.get("generation") or "").strip(), notes=(r.get("notes") or "").strip(), provisional=False, )) # De-duplicate colliding ids: every member of a colliding group gets a numeric suffix # (-1, -2, …) so no id is left as an ambiguous "base". Unique ids are untouched. counts = Counter(p.person_id for p in people) seen: dict[str, int] = {} for p in people: if counts[p.person_id] > 1: seen[p.person_id] = seen.get(p.person_id, 0) + 1 p.person_id = f"{p.person_id}-{seen[p.person_id]}" return people _GEB_RE = re.compile(r",?\s*geb\.?\s+.+$", re.I) _PAREN_RE = re.compile(r"\(([^)]+)\)\s*$") _MULTI_RE = re.compile(r"\s+(?:und|u)\s+", re.I) def find_known_last_name(segment: str) -> str | None: seg = segment.strip() for ln in config.KNOWN_LAST_NAMES: # config lists longest-first if seg == ln or seg.endswith(" " + ln): return ln return None def split_receivers(raw: str) -> list[str]: if not raw or not raw.strip(): return [] # 0. split on "//" if "//" in raw: out = [] for seg in raw.split("//"): out.extend(split_receivers(seg)) return out cleaned = _GEB_RE.sub("", raw).strip() if not cleaned: # e.g. a "geb. Müller"-only cell strips to empty return [] if not _MULTI_RE.search(cleaned): return [cleaned] shared_last = None pm = _PAREN_RE.search(cleaned) if pm: shared_last = pm.group(1).strip() cleaned = cleaned[:pm.start()].strip() parts = [p.strip() for p in _MULTI_RE.split(cleaned)] parts = [p for p in parts if p and p.lower() != "familie"] if not parts: return [] if len(parts) == 1: return [parts[0]] if shared_last: return [p if " " in p else f"{p} {shared_last}" for p in parts] last_seg = parts[-1] detected = find_known_last_name(last_seg) if detected: result = [] for p in parts[:-1]: if " " not in p and find_known_last_name(p) is None: result.append(f"{p} {detected}") else: result.append(p) result.append(last_seg) return result return parts def _norm(name: str) -> str: return re.sub(r"\s+", " ", _strip_accents(name).lower().replace(".", " ")).strip() class NameClass(StrEnum): RESOLVABLE = "resolvable" UNKNOWN = "unknown" SINGLE_TOKEN = "single_token" RELATIONAL = "relational" COLLECTIVE = "collective" PROSE = "prose" AMBIGUOUS_PAIR = "ambiguous_pair" _QUOTE_CHARS = "\"'\u201c\u201d\u201e\u201a\u2018\u2019" def classify_name(raw: str, given_names: set[str]) -> NameClass: """Classify a (post-split) sender/receiver string by why it may be unresolvable. Precedence (first match wins): UNKNOWN -> PROSE -> COLLECTIVE -> RELATIONAL -> SINGLE_TOKEN -> AMBIGUOUS_PAIR -> RESOLVABLE. """ s = raw.strip() if not s: return NameClass.RESOLVABLE low = s.lower() tokens = s.split() # alpha-only word tokens: "Fam.Cram" -> ["fam","cram"], so collective/relational terms # are matched as whole words (no substring/prefix false positives like "Allerton"). alpha_words = re.findall(r"[a-zäöüß]+", low) if "?" in s or any(m in low for m in config.UNKNOWN_NAME_MARKERS): return NameClass.UNKNOWN if (len(s) > config.PROSE_MAX_LEN or any(c.isdigit() for c in s) or any(q in s for q in _QUOTE_CHARS) or len(tokens) > 3): return NameClass.PROSE if any(w in config.COLLECTIVE_TERMS for w in alpha_words): return NameClass.COLLECTIVE if any(w in config.RELATIONAL_TERMS for w in alpha_words): return NameClass.RELATIONAL if len(tokens) == 1: return NameClass.SINGLE_TOKEN if len(tokens) == 2 and all(_norm(t) in given_names for t in tokens): return NameClass.AMBIGUOUS_PAIR return NameClass.RESOLVABLE # Known limitation: a 4+-token name with no digits/quotes (e.g. "Anna von der Heide") is # classified PROSE. Such multi-particle names are rare here and usually resolve via the # register; if they surface in review, lower-priority than the real prose entries. def build_given_names(register: list[Person], extra: set[str]) -> set[str]: """Set of normalized given names from the register (first + extra given) plus a supplement. Used by classify_name to tell a two-given-name pair (two people) from a first+surname. """ names: set[str] = set() for p in register: if p.first_name: names.add(_norm(p.first_name)) for g in p.extra_given_names: names.add(_norm(g)) for e in extra: names.add(_norm(e)) return names class AliasIndex: def __init__(self, people: list[Person]): self._by_alias: dict[str, str] = {} self._display: dict[str, str] = {} self.known_ids: set[str] = {p.person_id for p in people} first_name_ids: dict[str, list] = {} for p in people: self._display[p.person_id] = f"{p.first_name} {p.last_name}".strip() # Ordered, de-duplicated forms (NOT a set) so alias order is deterministic — NFR-IDEM-01. forms = [f"{p.first_name} {p.last_name}".strip()] if p.maiden_name: forms.append(f"{p.first_name} {p.maiden_name}".strip()) for extra in p.extra_given_names: forms.append(f"{extra} {p.last_name}".strip()) if p.nickname: forms.append(p.nickname) seen = set() for form in forms: if form in seen: continue seen.add(form) key = _norm(form) if key and key not in self._by_alias: self._by_alias[key] = p.person_id p.aliases.append(form) if p.first_name: ids = first_name_ids.setdefault(_norm(p.first_name), []) if p.person_id not in ids: ids.append(p.person_id) # first-name-only alias, only when unambiguous for fname, ids in first_name_ids.items(): if len(ids) == 1 and fname not in self._by_alias: self._by_alias[fname] = ids[0] def resolve(self, name: str): return self._by_alias.get(_norm(name)) def display(self, person_id: str) -> str: return self._display.get(person_id, "") def suggest(self, name: str): keys = list(self._by_alias.keys()) match = difflib.get_close_matches(_norm(name), keys, n=1, cutoff=config.FUZZY_SUGGEST_THRESHOLD) if not match: return None, 0.0 score = difflib.SequenceMatcher(None, _norm(name), match[0]).ratio() return self._by_alias[match[0]], score class ResolutionContext: """Resolves raw name strings to person ids; accumulates provisional persons and review data.""" def __init__(self, alias_index: AliasIndex, name_overrides: dict[str, str], given_names: set[str] | None = None): self.index = alias_index self.name_overrides = name_overrides self.given_names = given_names or set() self.provisional: dict[str, Person] = {} self.unmatched: dict[str, list] = {} self.unresolved: list[tuple] = [] # (raw_name, category, source_row) for non-RESOLVABLE names self._raw_to_pid: dict[str, str] = {} self.override_hits = 0 def _unique_id(self, base: str) -> str: """A provisional id must never collide with a register id or another provisional.""" used = self.index.known_ids | set(self.provisional) pid, n = base, 1 while pid in used: n += 1 pid = f"{base}-{n}" return pid def resolve_one(self, raw_name: str, source_row: int): """Return (person_id, display_name, matched: bool). '' name -> ('', '', True).""" name = (raw_name or "").strip() if not name: return "", "", True if name in self.name_overrides: self.override_hits += 1 pid = self.name_overrides[name] return pid, self.index.display(pid) or name, True pid = self.index.resolve(name) if pid: return pid, self.index.display(pid) or name, True # provisional person (unmatched) — never reuse a register id self.unmatched.setdefault(name, []).append(source_row) category = classify_name(name, self.given_names) if category is not NameClass.RESOLVABLE: self.unresolved.append((name, str(category), source_row)) if name in self._raw_to_pid: return self._raw_to_pid[name], name, False last, first = _last_first(name) pid = self._unique_id(slugify(last, first)) self.provisional[pid] = Person(person_id=pid, last_name=last, first_name=first, provisional=True) self._raw_to_pid[name] = pid return pid, name, False def resolve_sender(self, raw: str, source_row: int): """Senders are split like receivers (REQ-PERS-01). Primary = first part; multi flagged.""" parts = split_receivers(raw) if not parts: return "", "", True, False pid, name, matched = self.resolve_one(parts[0], source_row) for extra in parts[1:]: self.resolve_one(extra, source_row) # register the others as persons too return pid, name, matched, len(parts) > 1 def resolve_receivers(self, raw: str, source_row: int): return [self.resolve_one(part, source_row) for part in split_receivers(raw)] def _last_first(name: str): """Best-effort split of a free name string into (last, first) for slug/provisional building.""" name = name.strip() ln = find_known_last_name(name) if ln: first = name[: -len(ln)].strip() return ln, first tokens = name.split() if len(tokens) >= 2: return tokens[-1], " ".join(tokens[:-1]) return name, ""