Files
familienarchiv/nlp-service/extractor.py

198 lines
6.1 KiB
Python

from __future__ import annotations
import re
from datetime import date
import dateparser
import spacy
from spacy.language import Language
from models import ParseResponse
# ── Language model registry ──────────────────────────────────────────────────
_MODEL_NAMES: dict[str, str] = {
"de": "de_core_news_sm",
"en": "en_core_web_sm",
"es": "es_core_news_sm",
}
_nlp_cache: dict[str, Language] = {}
def get_nlp(lang: str) -> Language:
if lang not in _MODEL_NAMES:
raise ValueError(f"Unsupported language: {lang!r}. Valid: {list(_MODEL_NAMES)}")
if lang not in _nlp_cache:
_nlp_cache[lang] = spacy.load(_MODEL_NAMES[lang])
return _nlp_cache[lang]
def load_all_models() -> None:
for lang in _MODEL_NAMES:
get_nlp(lang)
# ── Step 1: Person name extraction ──────────────────────────────────────────
def extract_person_names(doc) -> list[str]:
"""Return PER entity texts in left-to-right span order."""
return [ent.text for ent in doc.ents if ent.label_ == "PER"]
# ── Step 2: Role detection ───────────────────────────────────────────────────
_SENDER_PREPS: dict[str, frozenset[str]] = {
"de": frozenset({"von", "vom"}),
"en": frozenset({"from", "by"}),
"es": frozenset({"de", "por"}),
}
_RECEIVER_PREPS: dict[str, frozenset[str]] = {
"de": frozenset({"an", "nach", "für"}),
"en": frozenset({"to", "for"}),
"es": frozenset({"para", "a"}),
}
def detect_person_role(doc, per_spans: list, lang: str) -> str:
"""Return 'sender', 'receiver', or 'any'.
Only meaningful for single-PER queries — two-person queries always return
'any' because Java derives direction from list position.
"""
if len(per_spans) != 1:
return "any"
span = per_spans[0]
root = span.root
sender = _SENDER_PREPS[lang]
receiver = _RECEIVER_PREPS[lang]
# Primary: dependency-tree children of the PER root
for child in root.children:
if child.dep_ in ("case", "prep", "mo"):
if child.lower_ in sender:
return "sender"
if child.lower_ in receiver:
return "receiver"
# Fallback: token immediately before the span start
if span.start > 0:
prev = doc[span.start - 1]
if prev.lower_ in sender:
return "sender"
if prev.lower_ in receiver:
return "receiver"
return "any"
# ── Step 3: Date parsing ─────────────────────────────────────────────────────
_YEAR_RE = re.compile(r"^\d{4}$")
_DATE_BEFORE: dict[str, frozenset[str]] = {
"de": frozenset({"vor"}),
"en": frozenset({"before"}),
"es": frozenset({"antes"}),
}
_DATE_AFTER: dict[str, frozenset[str]] = {
"de": frozenset({"nach"}),
"en": frozenset({"after"}),
"es": frozenset({"después", "despues"}),
}
_DATE_BETWEEN: dict[str, frozenset[str]] = {
"de": frozenset({"zwischen"}),
"en": frozenset({"between"}),
"es": frozenset({"entre"}),
}
def _parse_date_text(text: str, lang: str) -> date | None:
text = text.strip()
if _YEAR_RE.match(text):
year = int(text)
if 1000 < year < 3000:
return date(year, 1, 1)
parsed = dateparser.parse(
text,
languages=[lang],
settings={"PREFER_DAY_OF_MONTH": "first", "RETURN_AS_TIMEZONE_AWARE": False},
)
return parsed.date() if parsed else None
def _year_end(d: date) -> date:
"""If d is Jan 1, return Dec 31 of the same year (year-only boundary)."""
if d.month == 1 and d.day == 1:
return date(d.year, 12, 31)
return d
def extract_dates(doc, lang: str) -> tuple[str | None, str | None]:
"""Return (date_from, date_to) as ISO strings or None."""
date_spans = [ent for ent in doc.ents if ent.label_ == "DATE"]
if not date_spans:
return None, None
between_tokens = _DATE_BETWEEN[lang]
before_tokens = _DATE_BEFORE[lang]
after_tokens = _DATE_AFTER[lang]
# "zwischen X und Y" / "between X and Y" — two DATE spans form a range
has_between = any(tok.lower_ in between_tokens for tok in doc)
if has_between and len(date_spans) >= 2:
parsed = []
for span in date_spans[:2]:
d = _parse_date_text(span.text, lang)
if d:
parsed.append(d)
if len(parsed) == 2:
parsed.sort()
return parsed[0].isoformat(), _year_end(parsed[1]).isoformat()
# Single DATE span — use direction token
span = date_spans[0]
d = _parse_date_text(span.text, lang)
if not d:
return None, None
prev_lower = doc[span.start - 1].lower_ if span.start > 0 else ""
if prev_lower in before_tokens:
return None, _year_end(d).isoformat()
if prev_lower in after_tokens:
return d.isoformat(), None
# Bare year/date — closed year-range
return d.isoformat(), _year_end(d).isoformat()
# ── Step 4: Keyword extraction ───────────────────────────────────────────────
def extract_keywords(doc, excluded_spans: list) -> list[str]:
"""Return lowercased lemmas of content words not inside any NER span."""
excluded_indices: set[int] = set()
for span in excluded_spans:
excluded_indices.update(range(span.start, span.end))
seen: set[str] = set()
keywords: list[str] = []
for token in doc:
if token.i in excluded_indices:
continue
if token.pos_ not in ("NOUN", "PROPN"):
continue
if token.is_stop:
continue
lemma = token.lemma_.lower()
if len(lemma) < 3:
continue
if lemma not in seen:
seen.add(lemma)
keywords.append(lemma)
return keywords