From 092131930c7ba360d85e12befea3ea0d13bc2540 Mon Sep 17 00:00:00 2001 From: Marcel Date: Fri, 17 Apr 2026 16:52:50 +0200 Subject: [PATCH] feat(ocr): add spell_check module with German spellchecker and historical wordlist Co-Authored-By: Claude Sonnet 4.6 --- ocr-service/spell_check.py | 117 +++++++++++++++++++++++++++++++++++++ 1 file changed, 117 insertions(+) create mode 100644 ocr-service/spell_check.py diff --git a/ocr-service/spell_check.py b/ocr-service/spell_check.py new file mode 100644 index 00000000..03692ad6 --- /dev/null +++ b/ocr-service/spell_check.py @@ -0,0 +1,117 @@ +"""German spell-check post-processing for OCR output.""" + +import logging +import os + +from spellchecker import SpellChecker + +from confidence import CORRECTION_MARKER, ILLEGIBLE_MARKER, _collapse_adjacent_markers + +logger = logging.getLogger(__name__) + +_MIN_SPELL_CHECK_LEN = 4 + +_spell: SpellChecker | None = None + + +def load_spell_checker() -> None: + """Load German spell checker with supplementary historical wordlist. + + Safe to call multiple times — no-op if already loaded. + """ + global _spell + if _spell is not None: + return + + logger.info("Loading German spell checker...") + _spell = SpellChecker(language="de") + + historical_path = os.path.join(os.path.dirname(__file__), "dictionaries", "de_historical.txt") + if os.path.exists(historical_path): + with open(historical_path, encoding="utf-8") as f: + words = [ + line.strip() + for line in f + if line.strip() and not line.startswith("#") + ] + _spell.word_frequency.load_words(words) + logger.info("Loaded %d historical German words", len(words)) + else: + logger.warning("Historical German wordlist not found at %s", historical_path) + + logger.info("German spell checker ready") + + +def _strip_punctuation(token: str) -> tuple[str, str, str]: + """Split token into (leading_punct, word, trailing_punct). + + 'Word' characters are letters (including German umlauts) and digits. + Everything else is treated as punctuation. + """ + start = 0 + while start < len(token) and not (token[start].isalpha() or token[start].isdigit()): + start += 1 + + end = len(token) + while end > start and not (token[end - 1].isalpha() or token[end - 1].isdigit()): + end -= 1 + + return token[:start], token[start:end], token[end:] + + +def _is_numeric(word: str) -> bool: + return any(c.isdigit() for c in word) + + +def correct_text(text: str) -> str: + """Spell-check OCR text, correcting errors and marking gibberish as [unleserlich]. + + Already-present [unleserlich] tokens are preserved unchanged. + Words of fewer than 4 characters are exempt (particles, abbreviations). + Tokens containing digits pass through unchanged. + Adjacent [unleserlich] markers are collapsed into one. + Corrected tokens are marked with [?] (e.g. "Hauus" → "Haus[?]"). + + Args: + text: OCR output, possibly already containing [unleserlich] from confidence filtering. + + Returns: + Corrected text with unresolvable words replaced by [unleserlich]. + """ + if _spell is None: + raise RuntimeError("Spell checker not loaded — call load_spell_checker() first") + + if not text.strip(): + return text + + tokens = text.split() + checked: list[str] = [] + + for token in tokens: + if token == ILLEGIBLE_MARKER: + checked.append(token) + continue + + leading, word, trailing = _strip_punctuation(token) + + if len(word) < _MIN_SPELL_CHECK_LEN: + checked.append(token) + continue + + if _is_numeric(word): + checked.append(token) + continue + + if _spell.known([word]): + checked.append(token) + continue + + correction = _spell.correction(word) + if correction and _spell.word_frequency[correction] > 50: + if word[0].isupper() and not correction[0].isupper(): + correction = correction.capitalize() + checked.append(leading + correction + CORRECTION_MARKER + trailing) + else: + checked.append(ILLEGIBLE_MARKER) + + return " ".join(_collapse_adjacent_markers(checked))