Files
familienarchiv/ocr-service/engines/kraken.py
Marcel 64d27d6d61 feat(ocr): per-sender model registry and /train-sender endpoint
engines/kraken.py:
- Add _SenderModelRegistry with LRU eviction (max configurable via
  OCR_MAX_CACHED_MODELS env var), double-checked locking, invalidate(),
  and path whitelist (/app/models/ only)
- Add _load_sender_model() helper for testability
- extract_page_blocks() and extract_region_text() accept optional
  sender_model_path; route to sender registry when provided

models.py:
- OcrRequest gains senderModelPath: str | None = None field

main.py:
- /ocr and /ocr/stream pass request.senderModelPath to Kraken engine
- New /train-sender endpoint: validates output_model_path, runs ketos
  train with base model as starting point, invalidates sender cache

docker-compose.yml:
- Add OCR_MAX_CACHED_MODELS: "5" to ocr-service environment

test_sender_registry.py:
- 4 tests: cache hit, LRU eviction, invalidate, path traversal guard

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 18:05:39 +02:00

323 lines
10 KiB
Python

"""Kraken OCR engine wrapper — historical HTR model support for Kurrent/Suetterlin."""
import collections
import logging
import os
import threading
logger = logging.getLogger(__name__)
_model = None
_model_path = os.environ.get("KRAKEN_MODEL_PATH", "/app/models/german_kurrent.mlmodel")
_MODELS_DIR = os.path.realpath("/app/models")
_MAX_CACHED_SENDER_MODELS = int(os.environ.get("OCR_MAX_CACHED_MODELS", "5"))
def _load_sender_model(path: str):
"""Load a Kraken model from disk. Extracted for testability."""
from kraken.lib import models as kraken_models
return kraken_models.load_any(path)
class _SenderModelRegistry:
"""Thread-safe LRU cache for per-sender Kraken models.
Uses double-checked locking: model loading happens outside the lock to
avoid blocking concurrent OCR requests. At most one entry per path is
stored even under concurrent load.
"""
def __init__(self, max_size: int):
self._max_size = max_size
self._cache: collections.OrderedDict = collections.OrderedDict()
self._lock = threading.Lock()
def get_model(self, model_path: str):
"""Return the cached model or load it. Validates path is within /app/models/."""
resolved = os.path.realpath(model_path)
if not resolved.startswith(_MODELS_DIR + os.sep) and resolved != _MODELS_DIR:
raise ValueError(f"Sender model path not allowed: {model_path}")
with self._lock:
if model_path in self._cache:
self._cache.move_to_end(model_path)
return self._cache[model_path]
new_model = _load_sender_model(model_path)
with self._lock:
if model_path in self._cache:
self._cache.move_to_end(model_path)
return self._cache[model_path]
self._cache[model_path] = new_model
self._cache.move_to_end(model_path)
while len(self._cache) > self._max_size:
self._cache.popitem(last=False)
return new_model
def invalidate(self, model_path: str) -> None:
"""Remove model from cache so the next request reloads from disk."""
with self._lock:
self._cache.pop(model_path, None)
def size(self) -> int:
with self._lock:
return len(self._cache)
def _contains(self, model_path: str) -> bool:
with self._lock:
return model_path in self._cache
_sender_registry = _SenderModelRegistry(_MAX_CACHED_SENDER_MODELS)
def load_models():
"""Load the Kraken model at startup. Skips if model file is not present."""
global _model
if not os.path.exists(_model_path):
logger.warning("Kraken model not found at %s — Kurrent OCR will not be available", _model_path)
return
logger.info("Loading Kraken model from %s...", _model_path)
from kraken.lib import models as kraken_models
_model = kraken_models.load_any(_model_path)
logger.info("Kraken model loaded successfully")
def is_available() -> bool:
return _model is not None
def extract_page_blocks(image, page_idx: int, language: str = "de",
sender_model_path: str | None = None) -> list[dict]:
"""Run Kraken segmentation + recognition on a single PIL image.
Returns block dicts for that page. Coordinates are normalized to [0, 1].
When sender_model_path is provided, the per-sender fine-tuned model is used.
"""
from kraken import blla, rpred
from confidence import words_from_characters
if _model is None:
raise RuntimeError("Kraken model is not loaded")
active_model = _sender_registry.get_model(sender_model_path) if sender_model_path else _model
page_w, page_h = image.size
blocks = []
baseline_seg = blla.segment(image)
pred_it = rpred.rpred(active_model, image, baseline_seg)
for record in pred_it:
polygon_pts = record.boundary if hasattr(record, "boundary") and record.boundary else []
if polygon_pts:
xs = [p[0] for p in polygon_pts]
ys = [p[1] for p in polygon_pts]
x1, y1 = min(xs), min(ys)
x2, y2 = max(xs), max(ys)
else:
xs = [p[0] for p in record.baseline]
ys = [p[1] for p in record.baseline]
x1, y1 = min(xs), min(ys) - 5
x2, y2 = max(xs), max(ys) + 5
quad = _approximate_to_quad(polygon_pts, page_w, page_h) if polygon_pts else None
char_confidences = getattr(record, "confidences", [])
words = words_from_characters(record.prediction, char_confidences)
blocks.append({
"pageNumber": page_idx,
"x": x1 / page_w,
"y": y1 / page_h,
"width": (x2 - x1) / page_w,
"height": (y2 - y1) / page_h,
"polygon": quad,
"text": record.prediction,
"words": words,
})
return blocks
def extract_region_text(image, x: float, y: float, w: float, h: float,
sender_model_path: str | None = None) -> str:
"""Crop image to a normalized region and run Kraken recognition on the crop.
Used for guided OCR — skips full-page layout detection entirely.
A single synthetic baseline spanning the full crop width is used so that
blla.segment() (which crashes on small crops) is never called.
Coordinates are normalized to [0, 1].
When sender_model_path is provided, the per-sender fine-tuned model is used.
"""
from kraken import rpred
from kraken.containers import Segmentation, BaselineLine
if _model is None:
raise RuntimeError("Kraken model is not loaded")
active_model = _sender_registry.get_model(sender_model_path) if sender_model_path else _model
pw, ph = image.size
x1 = max(0, int(x * pw))
y1 = max(0, int(y * ph))
x2 = min(pw, int((x + w) * pw))
y2 = min(ph, int((y + h) * ph))
crop = image.crop((x1, y1, x2, y2))
cw, ch = crop.size
if cw == 0 or ch == 0:
return ""
# Single synthetic baseline at vertical midpoint, spanning full crop width.
# Kraken's bounds check is >= (not >), so all coordinates must be < image
# dimension — use cw-1 / ch-1 to stay strictly inside.
mid_y = ch // 2
synthetic_seg = Segmentation(
type="baselines",
imagename="",
text_direction="horizontal-lr",
script_detection=False,
lines=[
BaselineLine(
id="line0",
baseline=[(0, mid_y), (cw - 1, mid_y)],
boundary=[(0, 0), (cw - 1, 0), (cw - 1, ch - 1), (0, ch - 1)],
)
],
regions={},
line_orders=[],
)
pred_it = rpred.rpred(active_model, crop, synthetic_seg)
return " ".join(r.prediction for r in pred_it)
def extract_blocks(images: list, language: str = "de",
sender_model_path: str | None = None) -> list[dict]:
"""Run Kraken segmentation + recognition on a list of PIL images.
Returns block dicts with pageNumber, x, y, width, height, polygon, text.
Polygon is a 4-point quadrilateral approximation of the baseline polygon.
Coordinates are normalized to [0, 1].
"""
all_blocks = []
for page_idx, image in enumerate(images, start=1):
all_blocks.extend(extract_page_blocks(image, page_idx, language, sender_model_path))
return all_blocks
def _approximate_to_quad(points: list[tuple], page_w: float, page_h: float) -> list[list[float]] | None:
"""Approximate a polygon to a 4-point quadrilateral using the minimum bounding rectangle.
Uses gift-wrapping (Jarvis march) for convex hull, then rotating calipers
for the minimum area bounding rectangle. Pure Python, no scipy/numpy.
"""
if len(points) < 3:
return None
try:
hull = _convex_hull(points)
if len(hull) < 3:
return None
rect = _min_bounding_rect(hull)
# Normalize to [0, 1]
return [[p[0] / page_w, p[1] / page_h] for p in rect]
except Exception:
logger.debug("Failed to approximate polygon to quad, returning None")
return None
def _convex_hull(points: list[tuple]) -> list[tuple]:
"""Jarvis march (gift wrapping) algorithm for 2D convex hull."""
pts = list(set(points))
if len(pts) < 3:
return pts
# Start from leftmost point
start = min(pts, key=lambda p: (p[0], p[1]))
hull = []
current = start
while True:
hull.append(current)
candidate = pts[0]
for p in pts[1:]:
if candidate == current:
candidate = p
continue
cross = _cross(current, candidate, p)
if cross < 0:
candidate = p
elif cross == 0:
# Collinear — pick the farther point
if _dist_sq(current, p) > _dist_sq(current, candidate):
candidate = p
current = candidate
if current == start:
break
return hull
def _min_bounding_rect(hull: list[tuple]) -> list[tuple]:
"""Find the minimum area bounding rectangle of a convex hull using rotating calipers."""
n = len(hull)
if n < 2:
return hull
min_area = float("inf")
best_rect = None
for i in range(n):
# Edge vector
edge_x = hull[(i + 1) % n][0] - hull[i][0]
edge_y = hull[(i + 1) % n][1] - hull[i][1]
edge_len = (edge_x ** 2 + edge_y ** 2) ** 0.5
if edge_len == 0:
continue
# Unit vectors along and perpendicular to the edge
ux, uy = edge_x / edge_len, edge_y / edge_len
vx, vy = -uy, ux
# Project all hull points onto the edge coordinate system
projs_u = [p[0] * ux + p[1] * uy for p in hull]
projs_v = [p[0] * vx + p[1] * vy for p in hull]
min_u, max_u = min(projs_u), max(projs_u)
min_v, max_v = min(projs_v), max(projs_v)
area = (max_u - min_u) * (max_v - min_v)
if area < min_area:
min_area = area
# Reconstruct 4 corners in original coordinates
best_rect = [
(min_u * ux + min_v * vx, min_u * uy + min_v * vy),
(max_u * ux + min_v * vx, max_u * uy + min_v * vy),
(max_u * ux + max_v * vx, max_u * uy + max_v * vy),
(min_u * ux + max_v * vx, min_u * uy + max_v * vy),
]
return best_rect if best_rect else hull[:4]
def _cross(o: tuple, a: tuple, b: tuple) -> float:
return (a[0] - o[0]) * (b[1] - o[1]) - (a[1] - o[1]) * (b[0] - o[0])
def _dist_sq(a: tuple, b: tuple) -> float:
return (a[0] - b[0]) ** 2 + (a[1] - b[1]) ** 2