feat(ocr): add spell_check module with German spellchecker and historical wordlist
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
117
ocr-service/spell_check.py
Normal file
117
ocr-service/spell_check.py
Normal file
@@ -0,0 +1,117 @@
|
||||
"""German spell-check post-processing for OCR output."""
|
||||
|
||||
import logging
|
||||
import os
|
||||
|
||||
from spellchecker import SpellChecker
|
||||
|
||||
from confidence import CORRECTION_MARKER, ILLEGIBLE_MARKER, _collapse_adjacent_markers
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_MIN_SPELL_CHECK_LEN = 4
|
||||
|
||||
_spell: SpellChecker | None = None
|
||||
|
||||
|
||||
def load_spell_checker() -> None:
|
||||
"""Load German spell checker with supplementary historical wordlist.
|
||||
|
||||
Safe to call multiple times — no-op if already loaded.
|
||||
"""
|
||||
global _spell
|
||||
if _spell is not None:
|
||||
return
|
||||
|
||||
logger.info("Loading German spell checker...")
|
||||
_spell = SpellChecker(language="de")
|
||||
|
||||
historical_path = os.path.join(os.path.dirname(__file__), "dictionaries", "de_historical.txt")
|
||||
if os.path.exists(historical_path):
|
||||
with open(historical_path, encoding="utf-8") as f:
|
||||
words = [
|
||||
line.strip()
|
||||
for line in f
|
||||
if line.strip() and not line.startswith("#")
|
||||
]
|
||||
_spell.word_frequency.load_words(words)
|
||||
logger.info("Loaded %d historical German words", len(words))
|
||||
else:
|
||||
logger.warning("Historical German wordlist not found at %s", historical_path)
|
||||
|
||||
logger.info("German spell checker ready")
|
||||
|
||||
|
||||
def _strip_punctuation(token: str) -> tuple[str, str, str]:
|
||||
"""Split token into (leading_punct, word, trailing_punct).
|
||||
|
||||
'Word' characters are letters (including German umlauts) and digits.
|
||||
Everything else is treated as punctuation.
|
||||
"""
|
||||
start = 0
|
||||
while start < len(token) and not (token[start].isalpha() or token[start].isdigit()):
|
||||
start += 1
|
||||
|
||||
end = len(token)
|
||||
while end > start and not (token[end - 1].isalpha() or token[end - 1].isdigit()):
|
||||
end -= 1
|
||||
|
||||
return token[:start], token[start:end], token[end:]
|
||||
|
||||
|
||||
def _is_numeric(word: str) -> bool:
|
||||
return any(c.isdigit() for c in word)
|
||||
|
||||
|
||||
def correct_text(text: str) -> str:
|
||||
"""Spell-check OCR text, correcting errors and marking gibberish as [unleserlich].
|
||||
|
||||
Already-present [unleserlich] tokens are preserved unchanged.
|
||||
Words of fewer than 4 characters are exempt (particles, abbreviations).
|
||||
Tokens containing digits pass through unchanged.
|
||||
Adjacent [unleserlich] markers are collapsed into one.
|
||||
Corrected tokens are marked with [?] (e.g. "Hauus" → "Haus[?]").
|
||||
|
||||
Args:
|
||||
text: OCR output, possibly already containing [unleserlich] from confidence filtering.
|
||||
|
||||
Returns:
|
||||
Corrected text with unresolvable words replaced by [unleserlich].
|
||||
"""
|
||||
if _spell is None:
|
||||
raise RuntimeError("Spell checker not loaded — call load_spell_checker() first")
|
||||
|
||||
if not text.strip():
|
||||
return text
|
||||
|
||||
tokens = text.split()
|
||||
checked: list[str] = []
|
||||
|
||||
for token in tokens:
|
||||
if token == ILLEGIBLE_MARKER:
|
||||
checked.append(token)
|
||||
continue
|
||||
|
||||
leading, word, trailing = _strip_punctuation(token)
|
||||
|
||||
if len(word) < _MIN_SPELL_CHECK_LEN:
|
||||
checked.append(token)
|
||||
continue
|
||||
|
||||
if _is_numeric(word):
|
||||
checked.append(token)
|
||||
continue
|
||||
|
||||
if _spell.known([word]):
|
||||
checked.append(token)
|
||||
continue
|
||||
|
||||
correction = _spell.correction(word)
|
||||
if correction and _spell.word_frequency[correction] > 50:
|
||||
if word[0].isupper() and not correction[0].isupper():
|
||||
correction = correction.capitalize()
|
||||
checked.append(leading + correction + CORRECTION_MARKER + trailing)
|
||||
else:
|
||||
checked.append(ILLEGIBLE_MARKER)
|
||||
|
||||
return " ".join(_collapse_adjacent_markers(checked))
|
||||
Reference in New Issue
Block a user