338 lines
12 KiB
Python
338 lines
12 KiB
Python
"""Person register parsing, name splitting, alias resolution."""
|
|
import difflib
|
|
import re
|
|
import unicodedata
|
|
from collections import Counter
|
|
from dataclasses import dataclass, field
|
|
from enum import StrEnum
|
|
|
|
import config
|
|
import dates
|
|
|
|
_DIACRITIC_MAP = str.maketrans({"ä": "ae", "ö": "oe", "ü": "ue", "ß": "ss",
|
|
"Ä": "ae", "Ö": "oe", "Ü": "ue"})
|
|
|
|
|
|
def _strip_accents(s: str) -> str:
|
|
s = s.translate(_DIACRITIC_MAP)
|
|
s = unicodedata.normalize("NFKD", s)
|
|
return "".join(c for c in s if not unicodedata.combining(c))
|
|
|
|
|
|
def slugify(last: str, first: str) -> str:
|
|
raw = f"{last} {first}".strip()
|
|
raw = _strip_accents(raw).lower()
|
|
raw = re.sub(r"[^a-z0-9]+", "-", raw).strip("-")
|
|
return raw or "unknown"
|
|
|
|
|
|
@dataclass
|
|
class Person:
|
|
person_id: str
|
|
last_name: str = ""
|
|
first_name: str = ""
|
|
maiden_name: str = ""
|
|
title: str = ""
|
|
nickname: str = ""
|
|
extra_given_names: list[str] = field(default_factory=list)
|
|
birth_date: str | None = None
|
|
birth_date_raw: str = ""
|
|
birth_place: str = ""
|
|
death_date: str | None = None
|
|
death_date_raw: str = ""
|
|
death_place: str = ""
|
|
spouse: str = ""
|
|
generation: str = ""
|
|
notes: str = ""
|
|
aliases: list[str] = field(default_factory=list)
|
|
provisional: bool = False
|
|
|
|
|
|
_QUOTED_RE = re.compile(r'^[""\']\s*(.+?)\s*[""\']$')
|
|
|
|
|
|
def parse_register(rows: list[dict]) -> list[Person]:
|
|
people = []
|
|
for r in rows:
|
|
last = (r.get("last_name") or "").strip()
|
|
if not last:
|
|
continue
|
|
given_raw = (r.get("first_name") or "").strip()
|
|
givens = [g.strip() for g in given_raw.split(",") if g.strip()]
|
|
first = givens[0] if givens else ""
|
|
extra = givens[1:]
|
|
|
|
spouse_raw = (r.get("spouse") or "").strip()
|
|
nickname = ""
|
|
m = _QUOTED_RE.match(spouse_raw)
|
|
if m:
|
|
nickname = m.group(1)
|
|
spouse_raw = ""
|
|
|
|
birth = dates.parse_date(r.get("birth_date") or "")
|
|
death = dates.parse_date(r.get("death_date") or "")
|
|
people.append(Person(
|
|
person_id=slugify(last, first),
|
|
last_name=last, first_name=first, maiden_name=(r.get("maiden_name") or "").strip(),
|
|
nickname=nickname, extra_given_names=extra,
|
|
birth_date=birth.iso, birth_date_raw=(r.get("birth_date") or "").strip(), birth_place=(r.get("birth_place") or "").strip(),
|
|
death_date=death.iso, death_date_raw=(r.get("death_date") or "").strip(), death_place=(r.get("death_place") or "").strip(),
|
|
spouse=spouse_raw, generation=(r.get("generation") or "").strip(),
|
|
notes=(r.get("notes") or "").strip(), provisional=False,
|
|
))
|
|
# De-duplicate colliding ids: every member of a colliding group gets a numeric suffix
|
|
# (-1, -2, …) so no id is left as an ambiguous "base". Unique ids are untouched.
|
|
counts = Counter(p.person_id for p in people)
|
|
seen: dict[str, int] = {}
|
|
for p in people:
|
|
if counts[p.person_id] > 1:
|
|
seen[p.person_id] = seen.get(p.person_id, 0) + 1
|
|
p.person_id = f"{p.person_id}-{seen[p.person_id]}"
|
|
return people
|
|
|
|
|
|
_GEB_RE = re.compile(r",?\s*geb\.?\s+.+$", re.I)
|
|
_PAREN_RE = re.compile(r"\(([^)]+)\)\s*$")
|
|
_MULTI_RE = re.compile(r"\s+(?:und|u)\s+", re.I)
|
|
|
|
|
|
def find_known_last_name(segment: str) -> str | None:
|
|
seg = segment.strip()
|
|
for ln in config.KNOWN_LAST_NAMES: # config lists longest-first
|
|
if seg == ln or seg.endswith(" " + ln):
|
|
return ln
|
|
return None
|
|
|
|
|
|
def split_receivers(raw: str) -> list[str]:
|
|
if not raw or not raw.strip():
|
|
return []
|
|
# 0. split on "//"
|
|
if "//" in raw:
|
|
out = []
|
|
for seg in raw.split("//"):
|
|
out.extend(split_receivers(seg))
|
|
return out
|
|
cleaned = _GEB_RE.sub("", raw).strip()
|
|
if not cleaned: # e.g. a "geb. Müller"-only cell strips to empty
|
|
return []
|
|
if not _MULTI_RE.search(cleaned):
|
|
return [cleaned]
|
|
shared_last = None
|
|
pm = _PAREN_RE.search(cleaned)
|
|
if pm:
|
|
shared_last = pm.group(1).strip()
|
|
cleaned = cleaned[:pm.start()].strip()
|
|
parts = [p.strip() for p in _MULTI_RE.split(cleaned)]
|
|
parts = [p for p in parts if p and p.lower() != "familie"]
|
|
if not parts:
|
|
return []
|
|
if len(parts) == 1:
|
|
return [parts[0]]
|
|
if shared_last:
|
|
return [p if " " in p else f"{p} {shared_last}" for p in parts]
|
|
last_seg = parts[-1]
|
|
detected = find_known_last_name(last_seg)
|
|
if detected:
|
|
result = []
|
|
for p in parts[:-1]:
|
|
if " " not in p and find_known_last_name(p) is None:
|
|
result.append(f"{p} {detected}")
|
|
else:
|
|
result.append(p)
|
|
result.append(last_seg)
|
|
return result
|
|
return parts
|
|
|
|
|
|
def _norm(name: str) -> str:
|
|
return re.sub(r"\s+", " ", _strip_accents(name).lower().replace(".", " ")).strip()
|
|
|
|
|
|
class NameClass(StrEnum):
|
|
RESOLVABLE = "resolvable"
|
|
UNKNOWN = "unknown"
|
|
SINGLE_TOKEN = "single_token"
|
|
RELATIONAL = "relational"
|
|
COLLECTIVE = "collective"
|
|
PROSE = "prose"
|
|
AMBIGUOUS_PAIR = "ambiguous_pair"
|
|
|
|
|
|
_QUOTE_CHARS = "\"'\u201c\u201d\u201e\u201a\u2018\u2019"
|
|
|
|
|
|
def classify_name(raw: str, given_names: set[str]) -> NameClass:
|
|
"""Classify a (post-split) sender/receiver string by why it may be unresolvable.
|
|
|
|
Precedence (first match wins): UNKNOWN -> PROSE -> COLLECTIVE -> RELATIONAL ->
|
|
SINGLE_TOKEN -> AMBIGUOUS_PAIR -> RESOLVABLE.
|
|
"""
|
|
s = raw.strip()
|
|
if not s:
|
|
return NameClass.RESOLVABLE
|
|
low = s.lower()
|
|
tokens = s.split()
|
|
# alpha-only word tokens: "Fam.Cram" -> ["fam","cram"], so collective/relational terms
|
|
# are matched as whole words (no substring/prefix false positives like "Allerton").
|
|
alpha_words = re.findall(r"[a-zäöüß]+", low)
|
|
if "?" in s or any(m in low for m in config.UNKNOWN_NAME_MARKERS):
|
|
return NameClass.UNKNOWN
|
|
if (len(s) > config.PROSE_MAX_LEN or any(c.isdigit() for c in s)
|
|
or any(q in s for q in _QUOTE_CHARS) or len(tokens) > 3):
|
|
return NameClass.PROSE
|
|
if any(w in config.COLLECTIVE_TERMS for w in alpha_words):
|
|
return NameClass.COLLECTIVE
|
|
if any(w in config.RELATIONAL_TERMS for w in alpha_words):
|
|
return NameClass.RELATIONAL
|
|
if len(tokens) == 1:
|
|
return NameClass.SINGLE_TOKEN
|
|
if len(tokens) == 2 and all(_norm(t) in given_names for t in tokens):
|
|
return NameClass.AMBIGUOUS_PAIR
|
|
return NameClass.RESOLVABLE
|
|
|
|
|
|
# Known limitation: a 4+-token name with no digits/quotes (e.g. "Anna von der Heide") is
|
|
# classified PROSE. Such multi-particle names are rare here and usually resolve via the
|
|
# register; if they surface in review, lower-priority than the real prose entries.
|
|
|
|
|
|
def build_given_names(register: list[Person], extra: set[str]) -> set[str]:
|
|
"""Set of normalized given names from the register (first + extra given) plus a supplement.
|
|
|
|
Used by classify_name to tell a two-given-name pair (two people) from a first+surname.
|
|
"""
|
|
names: set[str] = set()
|
|
for p in register:
|
|
if p.first_name:
|
|
names.add(_norm(p.first_name))
|
|
for g in p.extra_given_names:
|
|
names.add(_norm(g))
|
|
for e in extra:
|
|
names.add(_norm(e))
|
|
return names
|
|
|
|
|
|
class AliasIndex:
|
|
def __init__(self, people: list[Person]):
|
|
self._by_alias: dict[str, str] = {}
|
|
self._display: dict[str, str] = {}
|
|
self.known_ids: set[str] = {p.person_id for p in people}
|
|
first_name_ids: dict[str, list] = {}
|
|
for p in people:
|
|
self._display[p.person_id] = f"{p.first_name} {p.last_name}".strip()
|
|
# Ordered, de-duplicated forms (NOT a set) so alias order is deterministic — NFR-IDEM-01.
|
|
forms = [f"{p.first_name} {p.last_name}".strip()]
|
|
if p.maiden_name:
|
|
forms.append(f"{p.first_name} {p.maiden_name}".strip())
|
|
for extra in p.extra_given_names:
|
|
forms.append(f"{extra} {p.last_name}".strip())
|
|
if p.nickname:
|
|
forms.append(p.nickname)
|
|
seen = set()
|
|
for form in forms:
|
|
if form in seen:
|
|
continue
|
|
seen.add(form)
|
|
key = _norm(form)
|
|
if key and key not in self._by_alias:
|
|
self._by_alias[key] = p.person_id
|
|
p.aliases.append(form)
|
|
if p.first_name:
|
|
ids = first_name_ids.setdefault(_norm(p.first_name), [])
|
|
if p.person_id not in ids:
|
|
ids.append(p.person_id)
|
|
# first-name-only alias, only when unambiguous
|
|
for fname, ids in first_name_ids.items():
|
|
if len(ids) == 1 and fname not in self._by_alias:
|
|
self._by_alias[fname] = ids[0]
|
|
|
|
def resolve(self, name: str):
|
|
return self._by_alias.get(_norm(name))
|
|
|
|
def display(self, person_id: str) -> str:
|
|
return self._display.get(person_id, "")
|
|
|
|
def suggest(self, name: str):
|
|
keys = list(self._by_alias.keys())
|
|
match = difflib.get_close_matches(_norm(name), keys, n=1, cutoff=config.FUZZY_SUGGEST_THRESHOLD)
|
|
if not match:
|
|
return None, 0.0
|
|
score = difflib.SequenceMatcher(None, _norm(name), match[0]).ratio()
|
|
return self._by_alias[match[0]], score
|
|
|
|
|
|
class ResolutionContext:
|
|
"""Resolves raw name strings to person ids; accumulates provisional persons and review data."""
|
|
def __init__(self, alias_index: AliasIndex, name_overrides: dict[str, str]):
|
|
self.index = alias_index
|
|
self.name_overrides = name_overrides
|
|
self.provisional: dict[str, Person] = {}
|
|
self.unmatched: dict[str, list] = {}
|
|
self.ambiguous: list[tuple] = []
|
|
self._raw_to_pid: dict[str, str] = {}
|
|
self.override_hits = 0
|
|
|
|
def _unique_id(self, base: str) -> str:
|
|
"""A provisional id must never collide with a register id or another provisional."""
|
|
used = self.index.known_ids | set(self.provisional)
|
|
pid, n = base, 1
|
|
while pid in used:
|
|
n += 1
|
|
pid = f"{base}-{n}"
|
|
return pid
|
|
|
|
def resolve_one(self, raw_name: str, source_row: int):
|
|
"""Return (person_id, display_name, matched: bool). '' name -> ('', '', True)."""
|
|
name = (raw_name or "").strip()
|
|
if not name:
|
|
return "", "", True
|
|
if name in self.name_overrides:
|
|
self.override_hits += 1
|
|
pid = self.name_overrides[name]
|
|
return pid, self.index.display(pid) or name, True
|
|
pid = self.index.resolve(name)
|
|
if pid:
|
|
return pid, self.index.display(pid) or name, True
|
|
# provisional person (unmatched) — never reuse a register id
|
|
self.unmatched.setdefault(name, []).append(source_row)
|
|
if name in self._raw_to_pid:
|
|
return self._raw_to_pid[name], name, False
|
|
last, first = _last_first(name)
|
|
pid = self._unique_id(slugify(last, first))
|
|
self.provisional[pid] = Person(person_id=pid, last_name=last, first_name=first, provisional=True)
|
|
self._raw_to_pid[name] = pid
|
|
return pid, name, False
|
|
|
|
def resolve_sender(self, raw: str, source_row: int):
|
|
"""Senders are split like receivers (REQ-PERS-01). Primary = first part; multi flagged."""
|
|
parts = split_receivers(raw)
|
|
if not parts:
|
|
return "", "", True, False
|
|
pid, name, matched = self.resolve_one(parts[0], source_row)
|
|
for extra in parts[1:]:
|
|
self.resolve_one(extra, source_row) # register the others as persons too
|
|
return pid, name, matched, len(parts) > 1
|
|
|
|
def resolve_receivers(self, raw: str, source_row: int):
|
|
results = []
|
|
for part in split_receivers(raw):
|
|
pid, name, matched = self.resolve_one(part, source_row)
|
|
if not matched and " " in part and find_known_last_name(part) is None and len(part.split()) == 2:
|
|
self.ambiguous.append((raw, part, source_row))
|
|
results.append((pid, name, matched))
|
|
return results
|
|
|
|
|
|
def _last_first(name: str):
|
|
"""Best-effort split of a free name string into (last, first) for slug/provisional building."""
|
|
name = name.strip()
|
|
ln = find_known_last_name(name)
|
|
if ln:
|
|
first = name[: -len(ln)].strip()
|
|
return ln, first
|
|
tokens = name.split()
|
|
if len(tokens) >= 2:
|
|
return tokens[-1], " ".join(tokens[:-1])
|
|
return name, ""
|