feat(nlp-service): replace spaCy NER with DB-backed PersonMatcher
Rule-based pipeline: persons matched via rapidfuzz against all known names loaded from DB at startup. Fixes first-name-only extraction (Eugenie, Herbert), merged-span bug (Herbert + Eugenie de Gruyter), false positives on compound nouns, and EN/ES model failures. Date extraction unchanged (regex). No spaCy models required. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,46 +1,33 @@
|
||||
"""Rule-based NLP pipeline: dates via regex, persons via DB-backed matcher."""
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from datetime import date
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import dateparser
|
||||
import spacy
|
||||
from spacy.language import Language
|
||||
|
||||
from models import ParseResponse
|
||||
from person_matcher import PersonMatcher
|
||||
|
||||
# ── Language model registry ──────────────────────────────────────────────────
|
||||
if TYPE_CHECKING:
|
||||
pass
|
||||
|
||||
_MODEL_NAMES: dict[str, str] = {
|
||||
"de": "de_core_news_sm",
|
||||
"en": "en_core_web_sm",
|
||||
"es": "es_core_news_sm",
|
||||
}
|
||||
# ── Module-level PersonMatcher (set at startup) ───────────────────────────────
|
||||
|
||||
_nlp_cache: dict[str, Language] = {}
|
||||
_matcher: PersonMatcher | None = None
|
||||
|
||||
|
||||
def get_nlp(lang: str) -> Language:
|
||||
if lang not in _MODEL_NAMES:
|
||||
raise ValueError(f"Unsupported language: {lang!r}. Valid: {list(_MODEL_NAMES)}")
|
||||
if lang not in _nlp_cache:
|
||||
_nlp_cache[lang] = spacy.load(_MODEL_NAMES[lang])
|
||||
return _nlp_cache[lang]
|
||||
def set_person_matcher(m: PersonMatcher) -> None:
|
||||
global _matcher
|
||||
_matcher = m
|
||||
|
||||
|
||||
def load_all_models() -> None:
|
||||
for lang in _MODEL_NAMES:
|
||||
get_nlp(lang)
|
||||
def get_person_matcher() -> PersonMatcher | None:
|
||||
return _matcher
|
||||
|
||||
|
||||
# ── Step 1: Person name extraction ──────────────────────────────────────────
|
||||
|
||||
def extract_person_names(doc) -> list[str]:
|
||||
"""Return PER entity texts in left-to-right span order."""
|
||||
return [ent.text for ent in doc.ents if ent.label_ == "PER"]
|
||||
|
||||
|
||||
# ── Step 2: Role detection ───────────────────────────────────────────────────
|
||||
# ── Preposition sets ──────────────────────────────────────────────────────────
|
||||
|
||||
_SENDER_PREPS: dict[str, frozenset[str]] = {
|
||||
"de": frozenset({"von", "vom"}),
|
||||
@@ -54,43 +41,12 @@ _RECEIVER_PREPS: dict[str, frozenset[str]] = {
|
||||
"es": frozenset({"para", "a"}),
|
||||
}
|
||||
|
||||
_ALL_PERSON_PREPS: dict[str, frozenset[str]] = {
|
||||
lang: _SENDER_PREPS[lang] | _RECEIVER_PREPS[lang]
|
||||
for lang in ("de", "en", "es")
|
||||
}
|
||||
|
||||
def detect_person_role(doc, per_spans: list, lang: str) -> str:
|
||||
"""Return 'sender', 'receiver', or 'any'.
|
||||
|
||||
Only meaningful for single-PER queries — two-person queries always return
|
||||
'any' because Java derives direction from list position.
|
||||
"""
|
||||
if len(per_spans) != 1:
|
||||
return "any"
|
||||
|
||||
span = per_spans[0]
|
||||
root = span.root
|
||||
sender = _SENDER_PREPS[lang]
|
||||
receiver = _RECEIVER_PREPS[lang]
|
||||
|
||||
# Primary: dependency-tree children of the PER root
|
||||
for child in root.children:
|
||||
if child.dep_ in ("case", "prep", "mo"):
|
||||
if child.lower_ in sender:
|
||||
return "sender"
|
||||
if child.lower_ in receiver:
|
||||
return "receiver"
|
||||
|
||||
# Fallback: token immediately before the span start
|
||||
if span.start > 0:
|
||||
prev = doc[span.start - 1]
|
||||
if prev.lower_ in sender:
|
||||
return "sender"
|
||||
if prev.lower_ in receiver:
|
||||
return "receiver"
|
||||
|
||||
return "any"
|
||||
|
||||
|
||||
# ── Step 3: Date parsing ─────────────────────────────────────────────────────
|
||||
|
||||
_YEAR_RE = re.compile(r"^\d{4}$")
|
||||
# ── Date direction tokens ─────────────────────────────────────────────────────
|
||||
|
||||
_DATE_BEFORE: dict[str, frozenset[str]] = {
|
||||
"de": frozenset({"vor"}),
|
||||
@@ -110,130 +66,219 @@ _DATE_BETWEEN: dict[str, frozenset[str]] = {
|
||||
"es": frozenset({"entre"}),
|
||||
}
|
||||
|
||||
# ── Stopword lists ────────────────────────────────────────────────────────────
|
||||
|
||||
def _parse_date_text(text: str, lang: str) -> date | None:
|
||||
text = text.strip()
|
||||
if _YEAR_RE.match(text):
|
||||
year = int(text)
|
||||
if 1000 < year < 3000:
|
||||
return date(year, 1, 1)
|
||||
parsed = dateparser.parse(
|
||||
text,
|
||||
languages=[lang],
|
||||
settings={"PREFER_DAY_OF_MONTH": "first", "RETURN_AS_TIMEZONE_AWARE": False},
|
||||
)
|
||||
return parsed.date() if parsed else None
|
||||
_STOPWORDS: dict[str, frozenset[str]] = {
|
||||
"de": frozenset({
|
||||
"der", "die", "das", "des", "dem", "den",
|
||||
"ein", "eine", "einem", "einen", "einer", "eines",
|
||||
"er", "sie", "es", "wir", "ihr", "ich", "du",
|
||||
"und", "oder", "aber", "doch", "auch", "noch", "nur",
|
||||
"in", "an", "auf", "aus", "bei", "mit", "nach", "von", "vom",
|
||||
"vor", "zu", "zur", "zum", "durch", "für", "über", "unter",
|
||||
"zwischen", "gegen", "ohne", "um", "bis", "seit", "wegen",
|
||||
"ist", "sind", "war", "waren", "wird", "werden",
|
||||
"hat", "haben", "hatte", "hatten",
|
||||
"sein", "seine", "seinen", "seiner", "seines",
|
||||
"ihre", "ihren", "ihrer", "ihrem", "ihres",
|
||||
"nicht", "kein", "keine", "keinen", "keinem", "keines",
|
||||
"so", "wie", "als", "da", "hier", "dort", "wo", "wer", "was",
|
||||
"im", "am", "beim", "ins", "ans",
|
||||
"ja", "nein", "denn", "wenn", "weil", "dass", "ob", "damit",
|
||||
"alle", "alles", "mehr", "sehr", "viel", "wenig",
|
||||
"diesem", "dieser", "dieses", "diese", "diesen",
|
||||
"jetzt", "dann", "nun", "schon", "wohl", "wurde", "wurden",
|
||||
"worden", "geschrieben", "seinen", "ihrer",
|
||||
"beim", "nach", "zum", "zur", "dem", "den",
|
||||
"seine", "ihrem", "Jahr", "Jahren", "jahre", "jahr",
|
||||
}),
|
||||
"en": frozenset({
|
||||
"the", "a", "an", "and", "or", "but", "in", "on", "at", "to",
|
||||
"for", "of", "with", "by", "from", "about", "as", "into",
|
||||
"through", "is", "are", "was", "were", "be", "been", "being",
|
||||
"have", "has", "had", "do", "does", "did", "will", "would",
|
||||
"could", "should", "may", "might", "must", "shall", "can",
|
||||
"i", "you", "he", "she", "it", "we", "they", "their", "our",
|
||||
"his", "her", "its", "my", "your",
|
||||
"this", "that", "these", "those", "all", "not", "no", "nor",
|
||||
"very", "more", "most", "much", "many", "some", "any",
|
||||
"before", "after", "between", "during", "since", "until",
|
||||
"when", "where", "who", "which", "what", "how",
|
||||
}),
|
||||
"es": frozenset({
|
||||
"el", "la", "los", "las", "un", "una", "unos", "unas",
|
||||
"y", "o", "pero", "sin", "con", "en", "de", "del", "al",
|
||||
"a", "ante", "bajo", "desde", "entre", "hacia", "hasta",
|
||||
"para", "por", "sobre", "tras",
|
||||
"es", "son", "era", "eran", "fue", "fueron", "ser", "estar",
|
||||
"ha", "han", "he", "tener", "tiene",
|
||||
"yo", "su", "sus", "mi", "tu",
|
||||
"este", "esta", "estos", "estas", "ese", "esa",
|
||||
"no", "muy", "todo", "todos", "toda",
|
||||
"que", "cuando", "donde", "como",
|
||||
"antes", "después", "durante", "desde", "hasta",
|
||||
}),
|
||||
}
|
||||
|
||||
# ── Year regex ────────────────────────────────────────────────────────────────
|
||||
|
||||
_YEAR_RE = re.compile(r"\b(\d{4})\b")
|
||||
_WORD_RE = re.compile(r"\b[^\W\d_]{3,}\b", re.UNICODE)
|
||||
|
||||
|
||||
def _year_end(d: date) -> date:
|
||||
"""If d is Jan 1, return Dec 31 of the same year (year-only boundary)."""
|
||||
if d.month == 1 and d.day == 1:
|
||||
return date(d.year, 12, 31)
|
||||
return d
|
||||
# ── Step 1 + 2: Person extraction and role detection ─────────────────────────
|
||||
|
||||
def _extract_persons_and_role(
|
||||
query: str,
|
||||
lang: str,
|
||||
) -> tuple[list[str], str]:
|
||||
"""Return (person_names, role) using the DB-backed PersonMatcher."""
|
||||
m = _matcher
|
||||
if m is None or len(m) == 0:
|
||||
return [], "any"
|
||||
|
||||
preps = _ALL_PERSON_PREPS[lang]
|
||||
stops = preps | _DATE_BEFORE[lang] | _DATE_AFTER[lang] | _DATE_BETWEEN[lang]
|
||||
matches = m.find_in_query(query, preps, stop_tokens=stops)
|
||||
|
||||
person_names = [text for text, _ in matches]
|
||||
|
||||
if len(matches) != 1:
|
||||
return person_names, "any"
|
||||
|
||||
_, prep = matches[0]
|
||||
if prep is None:
|
||||
return person_names, "any"
|
||||
if prep in _SENDER_PREPS[lang]:
|
||||
return person_names, "sender"
|
||||
if prep in _RECEIVER_PREPS[lang]:
|
||||
return person_names, "receiver"
|
||||
return person_names, "any"
|
||||
|
||||
|
||||
def _find_year_spans(doc) -> list:
|
||||
"""Fallback: find tokens that look like 4-digit years (1000–2999) when NER
|
||||
produces no DATE entities. Returns a list of single-token pseudo-spans
|
||||
(spaCy Span objects) labelled 'DATE'."""
|
||||
spans = []
|
||||
for token in doc:
|
||||
if _YEAR_RE.match(token.text):
|
||||
year = int(token.text)
|
||||
if 1000 < year < 3000:
|
||||
span = doc[token.i : token.i + 1]
|
||||
spans.append(span)
|
||||
return spans
|
||||
# ── Step 3: Date extraction ───────────────────────────────────────────────────
|
||||
|
||||
|
||||
def extract_dates(doc, lang: str) -> tuple[str | None, str | None]:
|
||||
"""Return (date_from, date_to) as ISO strings or None."""
|
||||
date_spans = [ent for ent in doc.ents if ent.label_ == "DATE"]
|
||||
|
||||
# Fallback: some spaCy small models (de, es) don't tag bare years as DATE
|
||||
if not date_spans:
|
||||
date_spans = _find_year_spans(doc)
|
||||
|
||||
if not date_spans:
|
||||
return None, None
|
||||
|
||||
between_tokens = _DATE_BETWEEN[lang]
|
||||
before_tokens = _DATE_BEFORE[lang]
|
||||
after_tokens = _DATE_AFTER[lang]
|
||||
|
||||
# "zwischen X und Y" / "between X and Y" — two DATE spans form a range
|
||||
has_between = any(tok.lower_ in between_tokens for tok in doc)
|
||||
if has_between and len(date_spans) >= 2:
|
||||
parsed = []
|
||||
for span in date_spans[:2]:
|
||||
d = _parse_date_text(span.text, lang)
|
||||
if d:
|
||||
parsed.append(d)
|
||||
if len(parsed) == 2:
|
||||
parsed.sort()
|
||||
return parsed[0].isoformat(), _year_end(parsed[1]).isoformat()
|
||||
|
||||
# Single DATE span — use direction token
|
||||
span = date_spans[0]
|
||||
d = _parse_date_text(span.text, lang)
|
||||
if not d:
|
||||
return None, None
|
||||
|
||||
# Check up to 2 tokens before the date span to handle multi-word prepositions
|
||||
# like Spanish "antes de 1920" where the keyword is 2 tokens back.
|
||||
prev_tokens = [
|
||||
doc[span.start - i].lower_
|
||||
for i in range(1, min(3, span.start + 1))
|
||||
def _find_years(query: str) -> list[tuple[int, int, int]]:
|
||||
"""Return list of (start, end, year_int) for valid 4-digit year tokens."""
|
||||
return [
|
||||
(m.start(), m.end(), int(m.group()))
|
||||
for m in _YEAR_RE.finditer(query)
|
||||
if 1000 < int(m.group()) < 3000
|
||||
]
|
||||
|
||||
if any(t in before_tokens for t in prev_tokens):
|
||||
return None, _year_end(d).isoformat()
|
||||
if any(t in after_tokens for t in prev_tokens):
|
||||
return d.isoformat(), None
|
||||
# Bare year/date — closed year-range
|
||||
return d.isoformat(), _year_end(d).isoformat()
|
||||
|
||||
def _direction_before_year(
|
||||
query: str,
|
||||
year_start: int,
|
||||
lang: str,
|
||||
person_names: list[str],
|
||||
) -> str:
|
||||
"""Classify direction of the date span as 'before', 'after', or 'bare'.
|
||||
|
||||
Looks at the two tokens immediately preceding the year. If the closer
|
||||
token is a matched person name part, the direction word belongs to that
|
||||
person — not to the year — so we return 'bare'.
|
||||
"""
|
||||
prefix_words = query[:year_start].split()
|
||||
if not prefix_words:
|
||||
return "bare"
|
||||
|
||||
person_tokens = {w.lower() for name in person_names for w in name.split()}
|
||||
recent = [w.lower() for w in prefix_words[-2:]]
|
||||
|
||||
before_set = _DATE_BEFORE[lang]
|
||||
after_set = _DATE_AFTER[lang]
|
||||
|
||||
for direction_tok in reversed(recent): # closest first
|
||||
if direction_tok in before_set:
|
||||
# Only use this if the word immediately before the year is not a person
|
||||
if recent[-1] in person_tokens:
|
||||
return "bare"
|
||||
return "before"
|
||||
if direction_tok in after_set:
|
||||
if recent[-1] in person_tokens:
|
||||
return "bare"
|
||||
return "after"
|
||||
|
||||
return "bare"
|
||||
|
||||
|
||||
# ── Step 4: Keyword extraction ───────────────────────────────────────────────
|
||||
def extract_dates(
|
||||
query: str,
|
||||
lang: str,
|
||||
person_names: list[str] | None = None,
|
||||
) -> tuple[str | None, str | None]:
|
||||
"""Return (date_from, date_to) as ISO strings or None."""
|
||||
if person_names is None:
|
||||
person_names = []
|
||||
|
||||
def extract_keywords(doc, excluded_spans: list) -> list[str]:
|
||||
"""Return lowercased lemmas of content words not inside any NER span."""
|
||||
excluded_indices: set[int] = set()
|
||||
for span in excluded_spans:
|
||||
excluded_indices.update(range(span.start, span.end))
|
||||
year_spans = _find_years(query)
|
||||
if not year_spans:
|
||||
return None, None
|
||||
|
||||
# "zwischen X und Y" / "between X and Y" — two years form a range
|
||||
query_lower = query.lower()
|
||||
if any(w in query_lower.split() for w in _DATE_BETWEEN[lang]) and len(year_spans) >= 2:
|
||||
years = sorted([y for _, _, y in year_spans[:2]])
|
||||
return date(years[0], 1, 1).isoformat(), date(years[1], 12, 31).isoformat()
|
||||
|
||||
start, end, year = year_spans[0]
|
||||
direction = _direction_before_year(query, start, lang, person_names)
|
||||
|
||||
if direction == "before":
|
||||
return None, date(year, 12, 31).isoformat()
|
||||
if direction == "after":
|
||||
return date(year, 1, 1).isoformat(), None
|
||||
# bare year → closed year range
|
||||
return date(year, 1, 1).isoformat(), date(year, 12, 31).isoformat()
|
||||
|
||||
|
||||
# ── Step 4: Keyword extraction ────────────────────────────────────────────────
|
||||
|
||||
def extract_keywords(
|
||||
query: str,
|
||||
lang: str,
|
||||
person_spans: list[str],
|
||||
year_strings: list[str],
|
||||
) -> list[str]:
|
||||
"""Return lowercased content words after removing persons, years, stopwords."""
|
||||
text = query
|
||||
|
||||
# Remove matched person spans (longest first to avoid partial replacements)
|
||||
for span in sorted(person_spans, key=len, reverse=True):
|
||||
text = re.sub(
|
||||
r"(?<!\w)" + re.escape(span) + r"(?!\w)",
|
||||
" ",
|
||||
text,
|
||||
flags=re.IGNORECASE,
|
||||
)
|
||||
|
||||
# Remove year tokens
|
||||
for yr in year_strings:
|
||||
text = re.sub(r"\b" + re.escape(yr) + r"\b", " ", text)
|
||||
|
||||
stopwords = _STOPWORDS.get(lang, frozenset())
|
||||
seen: set[str] = set()
|
||||
keywords: list[str] = []
|
||||
for token in doc:
|
||||
if token.i in excluded_indices:
|
||||
continue
|
||||
if token.pos_ not in ("NOUN", "PROPN"):
|
||||
continue
|
||||
if token.is_stop:
|
||||
continue
|
||||
lemma = token.lemma_.lower()
|
||||
if len(lemma) < 3:
|
||||
continue
|
||||
if lemma not in seen:
|
||||
seen.add(lemma)
|
||||
keywords.append(lemma)
|
||||
result: list[str] = []
|
||||
|
||||
return keywords
|
||||
for tok in _WORD_RE.findall(text):
|
||||
lower = tok.lower()
|
||||
if lower in stopwords or lower in seen:
|
||||
continue
|
||||
seen.add(lower)
|
||||
result.append(lower)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# ── Step 5: Assembly ─────────────────────────────────────────────────────────
|
||||
# ── Step 5: Assembly ──────────────────────────────────────────────────────────
|
||||
|
||||
def extract(query: str, lang: str) -> ParseResponse:
|
||||
"""Run the full NLP pipeline and return a ParseResponse."""
|
||||
nlp = get_nlp(lang)
|
||||
doc = nlp(query)
|
||||
|
||||
per_spans = [ent for ent in doc.ents if ent.label_ == "PER"]
|
||||
|
||||
person_names = extract_person_names(doc)
|
||||
person_role = detect_person_role(doc, per_spans, lang)
|
||||
date_from, date_to = extract_dates(doc, lang)
|
||||
keywords = extract_keywords(doc, list(doc.ents))
|
||||
"""Run the full rule-based pipeline and return a ParseResponse."""
|
||||
person_names, person_role = _extract_persons_and_role(query, lang)
|
||||
year_strings = [str(y) for _, _, y in _find_years(query)]
|
||||
date_from, date_to = extract_dates(query, lang, person_names)
|
||||
keywords = extract_keywords(query, lang, person_names, year_strings)
|
||||
|
||||
return ParseResponse(
|
||||
personNames=person_names,
|
||||
|
||||
Reference in New Issue
Block a user