"""Rule-based NLP pipeline: dates via regex, persons via DB-backed matcher.""" from __future__ import annotations import re from datetime import date from typing import TYPE_CHECKING import dateparser from models import ParseResponse from person_matcher import PersonMatcher if TYPE_CHECKING: pass # ── Module-level PersonMatcher (set at startup) ─────────────────────────────── _matcher: PersonMatcher | None = None def set_person_matcher(m: PersonMatcher) -> None: global _matcher _matcher = m def get_person_matcher() -> PersonMatcher | None: return _matcher # ── Preposition sets ────────────────────────────────────────────────────────── _SENDER_PREPS: dict[str, frozenset[str]] = { "de": frozenset({"von", "vom"}), "en": frozenset({"from", "by"}), "es": frozenset({"de", "por"}), } _RECEIVER_PREPS: dict[str, frozenset[str]] = { "de": frozenset({"an", "nach", "für"}), "en": frozenset({"to", "for"}), "es": frozenset({"para", "a"}), } _ALL_PERSON_PREPS: dict[str, frozenset[str]] = { lang: _SENDER_PREPS[lang] | _RECEIVER_PREPS[lang] for lang in ("de", "en", "es") } # ── Date direction tokens ───────────────────────────────────────────────────── _DATE_BEFORE: dict[str, frozenset[str]] = { "de": frozenset({"vor"}), "en": frozenset({"before"}), "es": frozenset({"antes"}), } _DATE_AFTER: dict[str, frozenset[str]] = { "de": frozenset({"nach"}), "en": frozenset({"after"}), "es": frozenset({"después", "despues"}), } _DATE_BETWEEN: dict[str, frozenset[str]] = { "de": frozenset({"zwischen"}), "en": frozenset({"between"}), "es": frozenset({"entre"}), } # ── Stopword lists ──────────────────────────────────────────────────────────── _STOPWORDS: dict[str, frozenset[str]] = { "de": frozenset({ "der", "die", "das", "des", "dem", "den", "ein", "eine", "einem", "einen", "einer", "eines", "er", "sie", "es", "wir", "ihr", "ich", "du", "und", "oder", "aber", "doch", "auch", "noch", "nur", "in", "an", "auf", "aus", "bei", "mit", "nach", "von", "vom", "vor", "zu", "zur", "zum", "durch", "für", "über", "unter", "zwischen", "gegen", "ohne", "um", "bis", "seit", "wegen", "ist", "sind", "war", "waren", "wird", "werden", "hat", "haben", "hatte", "hatten", "sein", "seine", "seinen", "seiner", "seines", "ihre", "ihren", "ihrer", "ihrem", "ihres", "nicht", "kein", "keine", "keinen", "keinem", "keines", "so", "wie", "als", "da", "hier", "dort", "wo", "wer", "was", "im", "am", "beim", "ins", "ans", "ja", "nein", "denn", "wenn", "weil", "dass", "ob", "damit", "alle", "alles", "mehr", "sehr", "viel", "wenig", "diesem", "dieser", "dieses", "diese", "diesen", "jetzt", "dann", "nun", "schon", "wohl", "wurde", "wurden", "worden", "geschrieben", "seinen", "ihrer", "beim", "nach", "zum", "zur", "dem", "den", "seine", "ihrem", "Jahr", "Jahren", "jahre", "jahr", }), "en": frozenset({ "the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by", "from", "about", "as", "into", "through", "is", "are", "was", "were", "be", "been", "being", "have", "has", "had", "do", "does", "did", "will", "would", "could", "should", "may", "might", "must", "shall", "can", "i", "you", "he", "she", "it", "we", "they", "their", "our", "his", "her", "its", "my", "your", "this", "that", "these", "those", "all", "not", "no", "nor", "very", "more", "most", "much", "many", "some", "any", "before", "after", "between", "during", "since", "until", "when", "where", "who", "which", "what", "how", }), "es": frozenset({ "el", "la", "los", "las", "un", "una", "unos", "unas", "y", "o", "pero", "sin", "con", "en", "de", "del", "al", "a", "ante", "bajo", "desde", "entre", "hacia", "hasta", "para", "por", "sobre", "tras", "es", "son", "era", "eran", "fue", "fueron", "ser", "estar", "ha", "han", "he", "tener", "tiene", "yo", "su", "sus", "mi", "tu", "este", "esta", "estos", "estas", "ese", "esa", "no", "muy", "todo", "todos", "toda", "que", "cuando", "donde", "como", "antes", "después", "durante", "desde", "hasta", }), } # ── Year regex ──────────────────────────────────────────────────────────────── _YEAR_RE = re.compile(r"\b(\d{4})\b") _WORD_RE = re.compile(r"\b[^\W\d_]{3,}\b", re.UNICODE) # ── Step 1 + 2: Person extraction and role detection ───────────────────────── def _extract_persons_and_role( query: str, lang: str, ) -> tuple[list[str], str]: """Return (person_names, role) using the DB-backed PersonMatcher.""" m = _matcher if m is None or len(m) == 0: return [], "any" preps = _ALL_PERSON_PREPS[lang] stops = preps | _DATE_BEFORE[lang] | _DATE_AFTER[lang] | _DATE_BETWEEN[lang] matches = m.find_in_query(query, preps, stop_tokens=stops) person_names = [text for text, _ in matches] if len(matches) != 1: return person_names, "any" _, prep = matches[0] if prep is None: return person_names, "any" if prep in _SENDER_PREPS[lang]: return person_names, "sender" if prep in _RECEIVER_PREPS[lang]: return person_names, "receiver" return person_names, "any" # ── Step 3: Date extraction ─────────────────────────────────────────────────── def _find_years(query: str) -> list[tuple[int, int, int]]: """Return list of (start, end, year_int) for valid 4-digit year tokens.""" return [ (m.start(), m.end(), int(m.group())) for m in _YEAR_RE.finditer(query) if 1000 < int(m.group()) < 3000 ] def _direction_before_year( query: str, year_start: int, lang: str, person_names: list[str], ) -> str: """Classify direction of the date span as 'before', 'after', or 'bare'. Looks at the two tokens immediately preceding the year. If the closer token is a matched person name part, the direction word belongs to that person — not to the year — so we return 'bare'. """ prefix_words = query[:year_start].split() if not prefix_words: return "bare" person_tokens = {w.lower() for name in person_names for w in name.split()} recent = [w.lower() for w in prefix_words[-2:]] before_set = _DATE_BEFORE[lang] after_set = _DATE_AFTER[lang] for direction_tok in reversed(recent): # closest first if direction_tok in before_set: # Only use this if the word immediately before the year is not a person if recent[-1] in person_tokens: return "bare" return "before" if direction_tok in after_set: if recent[-1] in person_tokens: return "bare" return "after" return "bare" def extract_dates( query: str, lang: str, person_names: list[str] | None = None, ) -> tuple[str | None, str | None]: """Return (date_from, date_to) as ISO strings or None.""" if person_names is None: person_names = [] year_spans = _find_years(query) if not year_spans: return None, None # "zwischen X und Y" / "between X and Y" — two years form a range query_lower = query.lower() if any(w in query_lower.split() for w in _DATE_BETWEEN[lang]) and len(year_spans) >= 2: years = sorted([y for _, _, y in year_spans[:2]]) return date(years[0], 1, 1).isoformat(), date(years[1], 12, 31).isoformat() start, end, year = year_spans[0] direction = _direction_before_year(query, start, lang, person_names) if direction == "before": return None, date(year, 12, 31).isoformat() if direction == "after": return date(year, 1, 1).isoformat(), None # bare year → closed year range return date(year, 1, 1).isoformat(), date(year, 12, 31).isoformat() # ── Step 4: Keyword extraction ──────────────────────────────────────────────── def extract_keywords( query: str, lang: str, person_spans: list[str], year_strings: list[str], ) -> list[str]: """Return lowercased content words after removing persons, years, stopwords.""" text = query # Remove matched person spans (longest first to avoid partial replacements) for span in sorted(person_spans, key=len, reverse=True): text = re.sub( r"(? ParseResponse: """Run the full rule-based pipeline and return a ParseResponse.""" person_names, person_role = _extract_persons_and_role(query, lang) year_strings = [str(y) for _, _, y in _find_years(query)] date_from, date_to = extract_dates(query, lang, person_names) keywords = extract_keywords(query, lang, person_names, year_strings) return ParseResponse( personNames=person_names, personRole=person_role, dateFrom=date_from, dateTo=date_to, keywords=keywords, rawQuery=query, )