"""Kraken OCR engine wrapper — historical HTR model support for Kurrent/Suetterlin.""" from __future__ import annotations import collections import logging import os import threading from typing import TYPE_CHECKING if TYPE_CHECKING: from PIL.Image import Image logger = logging.getLogger(__name__) _model = None _model_path = os.environ.get("KRAKEN_MODEL_PATH", "/app/models/german_kurrent.mlmodel") _MODELS_DIR = os.path.realpath("/app/models") _MAX_CACHED_SENDER_MODELS = int(os.environ.get("OCR_MAX_CACHED_MODELS", "5")) def _load_sender_model(path: str): """Load a Kraken model from disk. Extracted for testability.""" from kraken.lib import models as kraken_models return kraken_models.load_any(path) class _SenderModelRegistry: """Thread-safe LRU cache for per-sender Kraken models. Uses double-checked locking: model loading happens outside the lock to avoid blocking concurrent OCR requests. At most one entry per path is stored even under concurrent load. """ def __init__(self, max_size: int): self._max_size = max_size self._cache: collections.OrderedDict = collections.OrderedDict() self._lock = threading.Lock() def get_model(self, model_path: str): """Return the cached model or load it. Validates path is within /app/models/.""" resolved = os.path.realpath(model_path) if not resolved.startswith(_MODELS_DIR + os.sep) and resolved != _MODELS_DIR: raise ValueError(f"Sender model path not allowed: {model_path}") with self._lock: if model_path in self._cache: self._cache.move_to_end(model_path) return self._cache[model_path] new_model = _load_sender_model(model_path) with self._lock: if model_path in self._cache: self._cache.move_to_end(model_path) return self._cache[model_path] self._cache[model_path] = new_model self._cache.move_to_end(model_path) while len(self._cache) > self._max_size: self._cache.popitem(last=False) return new_model def invalidate(self, model_path: str) -> None: """Remove model from cache so the next request reloads from disk.""" with self._lock: self._cache.pop(model_path, None) def size(self) -> int: with self._lock: return len(self._cache) def _contains(self, model_path: str) -> bool: with self._lock: return model_path in self._cache _sender_registry = _SenderModelRegistry(_MAX_CACHED_SENDER_MODELS) def load_models(): """Load the Kraken model at startup. Skips if model file is not present.""" global _model if not os.path.exists(_model_path): logger.warning("Kraken model not found at %s — Kurrent OCR will not be available", _model_path) return logger.info("Loading Kraken model from %s...", _model_path) from kraken.lib import models as kraken_models _model = kraken_models.load_any(_model_path) logger.info("Kraken model loaded successfully") def is_available() -> bool: return _model is not None def extract_page_blocks(image: Image, page_idx: int, language: str = "de", sender_model_path: str | None = None) -> list[dict]: """Run Kraken segmentation + recognition on a single PIL image. Returns block dicts for that page. Coordinates are normalized to [0, 1]. When sender_model_path is provided, the per-sender fine-tuned model is used. """ from kraken import blla, rpred from confidence import words_from_characters if _model is None: raise RuntimeError("Kraken model is not loaded") active_model = _sender_registry.get_model(sender_model_path) if sender_model_path else _model page_w, page_h = image.size blocks = [] baseline_seg = blla.segment(image) pred_it = rpred.rpred(active_model, image, baseline_seg) for record in pred_it: polygon_pts = record.boundary if hasattr(record, "boundary") and record.boundary else [] if polygon_pts: xs = [p[0] for p in polygon_pts] ys = [p[1] for p in polygon_pts] x1, y1 = min(xs), min(ys) x2, y2 = max(xs), max(ys) else: xs = [p[0] for p in record.baseline] ys = [p[1] for p in record.baseline] x1, y1 = min(xs), min(ys) - 5 x2, y2 = max(xs), max(ys) + 5 quad = _approximate_to_quad(polygon_pts, page_w, page_h) if polygon_pts else None char_confidences = getattr(record, "confidences", []) words = words_from_characters(record.prediction, char_confidences) blocks.append({ "pageNumber": page_idx, "x": x1 / page_w, "y": y1 / page_h, "width": (x2 - x1) / page_w, "height": (y2 - y1) / page_h, "polygon": quad, "text": record.prediction, "words": words, }) return blocks def extract_region_text(image: Image, x: float, y: float, w: float, h: float, sender_model_path: str | None = None) -> str: """Crop image to a normalized region and run Kraken recognition on the crop. Used for guided OCR — skips full-page layout detection entirely. A single synthetic baseline spanning the full crop width is used so that blla.segment() (which crashes on small crops) is never called. Coordinates are normalized to [0, 1]. When sender_model_path is provided, the per-sender fine-tuned model is used. """ from kraken import rpred from kraken.containers import Segmentation, BaselineLine if _model is None: raise RuntimeError("Kraken model is not loaded") active_model = _sender_registry.get_model(sender_model_path) if sender_model_path else _model pw, ph = image.size x1 = max(0, int(x * pw)) y1 = max(0, int(y * ph)) x2 = min(pw, int((x + w) * pw)) y2 = min(ph, int((y + h) * ph)) crop = image.crop((x1, y1, x2, y2)) cw, ch = crop.size if cw == 0 or ch == 0: return "" # Single synthetic baseline at vertical midpoint, spanning full crop width. # Kraken's bounds check is >= (not >), so all coordinates must be < image # dimension — use cw-1 / ch-1 to stay strictly inside. mid_y = ch // 2 synthetic_seg = Segmentation( type="baselines", imagename="", text_direction="horizontal-lr", script_detection=False, lines=[ BaselineLine( id="line0", baseline=[(0, mid_y), (cw - 1, mid_y)], boundary=[(0, 0), (cw - 1, 0), (cw - 1, ch - 1), (0, ch - 1)], ) ], regions={}, line_orders=[], ) pred_it = rpred.rpred(active_model, crop, synthetic_seg) return " ".join(r.prediction for r in pred_it) def extract_blocks(images: list, language: str = "de", sender_model_path: str | None = None) -> list[dict]: """Run Kraken segmentation + recognition on a list of PIL images. Returns block dicts with pageNumber, x, y, width, height, polygon, text. Polygon is a 4-point quadrilateral approximation of the baseline polygon. Coordinates are normalized to [0, 1]. """ all_blocks = [] for page_idx, image in enumerate(images, start=1): all_blocks.extend(extract_page_blocks(image, page_idx, language, sender_model_path)) return all_blocks def _approximate_to_quad(points: list[tuple], page_w: float, page_h: float) -> list[list[float]] | None: """Approximate a polygon to a 4-point quadrilateral using the minimum bounding rectangle. Uses gift-wrapping (Jarvis march) for convex hull, then rotating calipers for the minimum area bounding rectangle. Pure Python, no scipy/numpy. """ if len(points) < 3: return None try: hull = _convex_hull(points) if len(hull) < 3: return None rect = _min_bounding_rect(hull) # Normalize to [0, 1] return [[p[0] / page_w, p[1] / page_h] for p in rect] except Exception: logger.debug("Failed to approximate polygon to quad, returning None") return None def _convex_hull(points: list[tuple]) -> list[tuple]: """Jarvis march (gift wrapping) algorithm for 2D convex hull.""" pts = list(set(points)) if len(pts) < 3: return pts # Start from leftmost point start = min(pts, key=lambda p: (p[0], p[1])) hull = [] current = start while True: hull.append(current) candidate = pts[0] for p in pts[1:]: if candidate == current: candidate = p continue cross = _cross(current, candidate, p) if cross < 0: candidate = p elif cross == 0: # Collinear — pick the farther point if _dist_sq(current, p) > _dist_sq(current, candidate): candidate = p current = candidate if current == start: break return hull def _min_bounding_rect(hull: list[tuple]) -> list[tuple]: """Find the minimum area bounding rectangle of a convex hull using rotating calipers.""" n = len(hull) if n < 2: return hull min_area = float("inf") best_rect = None for i in range(n): # Edge vector edge_x = hull[(i + 1) % n][0] - hull[i][0] edge_y = hull[(i + 1) % n][1] - hull[i][1] edge_len = (edge_x ** 2 + edge_y ** 2) ** 0.5 if edge_len == 0: continue # Unit vectors along and perpendicular to the edge ux, uy = edge_x / edge_len, edge_y / edge_len vx, vy = -uy, ux # Project all hull points onto the edge coordinate system projs_u = [p[0] * ux + p[1] * uy for p in hull] projs_v = [p[0] * vx + p[1] * vy for p in hull] min_u, max_u = min(projs_u), max(projs_u) min_v, max_v = min(projs_v), max(projs_v) area = (max_u - min_u) * (max_v - min_v) if area < min_area: min_area = area # Reconstruct 4 corners in original coordinates best_rect = [ (min_u * ux + min_v * vx, min_u * uy + min_v * vy), (max_u * ux + min_v * vx, max_u * uy + min_v * vy), (max_u * ux + max_v * vx, max_u * uy + max_v * vy), (min_u * ux + max_v * vx, min_u * uy + max_v * vy), ] return best_rect if best_rect else hull[:4] def _cross(o: tuple, a: tuple, b: tuple) -> float: return (a[0] - o[0]) * (b[1] - o[1]) - (a[1] - o[1]) * (b[0] - o[0]) def _dist_sq(a: tuple, b: tuple) -> float: return (a[0] - b[0]) ** 2 + (a[1] - b[1]) ** 2