"""DB-backed person name matcher with fuzzy search.""" from __future__ import annotations import re from rapidfuzz import fuzz, process _PUNCT_RE = re.compile(r"[^\w\s\-]", re.UNICODE) _YEAR_PAT = re.compile(r"^\d{4}$") # Tokens that cannot appear in a real person's first name — used to filter DB # records that are annotations or descriptions masquerading as persons. _NON_NAME_TOKENS: frozenset[str] = frozenset({ # German prepositions "an", "in", "im", "am", "aus", "von", "vom", "nach", "zu", "zum", "zur", "für", "bei", "beim", "mit", "über", "unter", "durch", "gegen", "ohne", "bis", "seit", "des", "dem", "den", # German possessives / pronouns "sein", "seine", "seinen", "seiner", "ihr", "ihre", "ihren", "ihrem", # English prepositions "for", "from", "by", "of", # Spanish prepositions "del", "por", "para", }) class PersonMatcher: """Match person name fragments from free-text queries against known persons. Loaded once at startup from (first_name, last_name) DB rows. At query time, scans for tokens following person-indicator prepositions and fuzzy- matches them against the loaded name variants. Returns the original query text (not the resolved DB name) so the Java resolveNames() mechanism can do its own disambiguation. """ def __init__(self) -> None: self._names: list[str] = [] # lowercase name variants # ── Loading ─────────────────────────────────────────────────────────────── def load(self, rows: list[tuple[str | None, str | None]]) -> None: """Populate from DB rows of (first_name, last_name).""" seen: set[str] = set() for first, last in rows: first = (first or "").strip() last = (last or "").strip() # Skip records whose first_name contains function words — these are # annotations or descriptions in the DB, not real person names. if any(w in _NON_NAME_TOKENS for w in first.lower().split()): continue for variant in _name_variants(first, last): key = variant.lower() if key not in seen: seen.add(key) self._names.append(key) def __len__(self) -> int: return len(self._names) # ── Query-time matching ─────────────────────────────────────────────────── def find_in_query( self, query: str, prepositions: frozenset[str], stop_tokens: frozenset[str] | None = None, threshold: int = 80, ) -> list[tuple[str, str | None]]: """Find person name spans in *query*. Returns a list of ``(original_query_text, anchoring_prep_or_None)`` in left-to-right order. Parameters ---------- prepositions: Person-indicator prepositions for the query language (triggers a scan for the tokens that follow). stop_tokens: Tokens that terminate a name span (prepositions + date-direction words). "de" is a special exception: when immediately followed by a capitalised word it is treated as a name connector (e.g. "de Gruyter") rather than a stop. threshold: Minimum rapidfuzz token_sort_ratio score to accept a match. Strategy -------- Pass 1 — prep-anchored: for each person-indicator preposition found in the token list, collect up to 3 consecutive non-stop, non-year tokens after it and fuzzy-match the resulting span against loaded names. Longest match wins. Pass 2 — full-name scan: scan positions not yet consumed for exact multi-word full-name matches (no preposition anchor required). """ tokens = query.split() clean = [_PUNCT_RE.sub("", t) for t in tokens] lower = [t.lower() for t in clean] # Prepositions always terminate a name span, even without explicit stop_tokens. stops = (stop_tokens or frozenset()) | prepositions consumed: set[int] = set() hits: list[tuple[int, str, str | None]] = [] # (position, text, prep) # Pass 1 — prep-anchored for i, ltok in enumerate(lower): if ltok not in prepositions or i + 1 >= len(tokens): continue # Build candidate span — stop at stop tokens or 4-digit years. # Exception: "de" before a capitalised word is a name connector. span_indices: list[int] = [] j = i + 1 while j < len(tokens) and len(span_indices) < 3: if j in consumed: break t = lower[j] if t in stops or _YEAR_PAT.match(clean[j]): # Allow "de" when the *next* token starts with a capital — # e.g. "Walter de Gruyter". next_clean = clean[j + 1] if j + 1 < len(tokens) else "" if t == "de" and next_clean[:1].isupper(): pass # connector — keep going else: break span_indices.append(j) j += 1 # Try longest match first, then shorter spans for span_len in range(len(span_indices), 0, -1): idx = span_indices[:span_len] span_lower = " ".join(lower[k] for k in idx) if self._is_match(span_lower, threshold): hits.append((idx[0], " ".join(tokens[k] for k in idx), ltok)) consumed.update(idx) break # Pass 2 — full multi-word name scan (exact only, no preposition needed) for span_len in (3, 2): for i in range(len(tokens) - span_len + 1): span_idx = range(i, i + span_len) if any(j in consumed for j in span_idx): continue span_lower = " ".join(lower[i : i + span_len]) if span_lower in self._names: hits.append((i, " ".join(tokens[i : i + span_len]), None)) consumed.update(span_idx) hits.sort(key=lambda h: h[0]) return [(text, prep) for _, text, prep in hits] # ── Internal helpers ────────────────────────────────────────────────────── def _is_match(self, text: str, threshold: int) -> bool: """Return True if *text* fuzzy-matches any loaded name at >= threshold.""" if not self._names or len(text.strip()) < 3: return False text_lower = text.strip().lower() if text_lower in self._names: return True # exact match — fast path result = process.extractOne( text_lower, self._names, scorer=fuzz.token_sort_ratio, score_cutoff=threshold, ) return result is not None # ── helpers ─────────────────────────────────────────────────────────────────── def _name_variants(first: str, last: str) -> list[str]: """Return the name variants to index for a single person.""" variants = [] if first and last: variants.append(f"{first} {last}") if first: variants.append(first) if last: variants.append(last) return variants