Files
familienarchiv/ocr-service/engines/kraken.py
Marcel ee58b63517 feat(ocr): add guided OCR mode using existing annotation regions
When a document has manually drawn annotation boxes, the user can now
enable "Nur annotierte Bereiche" in the OCR trigger panel. The engine
skips layout detection entirely and runs recognition only within the
pre-drawn bounding boxes, preserving manual transcription blocks.

- Python: adds OcrRegion model, extend OcrRequest/OcrBlock; guided
  branch in /ocr/stream groups by page and crops each region
- Engines: add extract_region_text() to both Kraken and Surya
- Java: adds OcrBlockResult.annotationId, OcrClient.OcrRegion,
  TriggerOcrDTO.useExistingAnnotations; OcrAsyncRunner dispatches to
  upsertGuidedBlock when annotationId is present; OcrService threads
  the flag through to runSingleDocument
- TranscriptionService: adds upsertGuidedBlock (creates, updates OCR,
  or preserves MANUAL blocks)
- Frontend: guided OCR toggle in OcrTrigger shown when blocks exist;
  skips destructive-replace confirmation in guided mode

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 15:57:54 +02:00

224 lines
6.9 KiB
Python

"""Kraken OCR engine wrapper — historical HTR model support for Kurrent/Suetterlin."""
import logging
import os
logger = logging.getLogger(__name__)
_model = None
_model_path = os.environ.get("KRAKEN_MODEL_PATH", "/app/models/german_kurrent.mlmodel")
def load_models():
"""Load the Kraken model at startup. Skips if model file is not present."""
global _model
if not os.path.exists(_model_path):
logger.warning("Kraken model not found at %s — Kurrent OCR will not be available", _model_path)
return
logger.info("Loading Kraken model from %s...", _model_path)
from kraken.lib import models as kraken_models
_model = kraken_models.load_any(_model_path)
logger.info("Kraken model loaded successfully")
def is_available() -> bool:
return _model is not None
def extract_page_blocks(image, page_idx: int, language: str = "de") -> list[dict]:
"""Run Kraken segmentation + recognition on a single PIL image.
Returns block dicts for that page. Coordinates are normalized to [0, 1].
"""
from kraken import blla, rpred
from confidence import words_from_characters
if _model is None:
raise RuntimeError("Kraken model is not loaded")
page_w, page_h = image.size
blocks = []
baseline_seg = blla.segment(image)
pred_it = rpred.rpred(_model, image, baseline_seg)
for record in pred_it:
polygon_pts = record.boundary if hasattr(record, "boundary") and record.boundary else []
if polygon_pts:
xs = [p[0] for p in polygon_pts]
ys = [p[1] for p in polygon_pts]
x1, y1 = min(xs), min(ys)
x2, y2 = max(xs), max(ys)
else:
xs = [p[0] for p in record.baseline]
ys = [p[1] for p in record.baseline]
x1, y1 = min(xs), min(ys) - 5
x2, y2 = max(xs), max(ys) + 5
quad = _approximate_to_quad(polygon_pts, page_w, page_h) if polygon_pts else None
char_confidences = getattr(record, "confidences", [])
words = words_from_characters(record.prediction, char_confidences)
blocks.append({
"pageNumber": page_idx,
"x": x1 / page_w,
"y": y1 / page_h,
"width": (x2 - x1) / page_w,
"height": (y2 - y1) / page_h,
"polygon": quad,
"text": record.prediction,
"words": words,
})
return blocks
def extract_region_text(image, x: float, y: float, w: float, h: float) -> str:
"""Crop image to a normalized region and run Kraken recognition on the crop.
Used for guided OCR — skips full-page layout detection and only processes
the given bounding box. Coordinates are normalized to [0, 1].
"""
from kraken import blla, rpred
if _model is None:
raise RuntimeError("Kraken model is not loaded")
pw, ph = image.size
x1 = max(0, int(x * pw))
y1 = max(0, int(y * ph))
x2 = min(pw, int((x + w) * pw))
y2 = min(ph, int((y + h) * ph))
crop = image.crop((x1, y1, x2, y2))
baseline_seg = blla.segment(crop)
pred_it = rpred.rpred(_model, crop, baseline_seg)
return " ".join(r.prediction for r in pred_it)
def extract_blocks(images: list, language: str = "de") -> list[dict]:
"""Run Kraken segmentation + recognition on a list of PIL images.
Returns block dicts with pageNumber, x, y, width, height, polygon, text.
Polygon is a 4-point quadrilateral approximation of the baseline polygon.
Coordinates are normalized to [0, 1].
"""
all_blocks = []
for page_idx, image in enumerate(images, start=1):
all_blocks.extend(extract_page_blocks(image, page_idx, language))
return all_blocks
def _approximate_to_quad(points: list[tuple], page_w: float, page_h: float) -> list[list[float]] | None:
"""Approximate a polygon to a 4-point quadrilateral using the minimum bounding rectangle.
Uses gift-wrapping (Jarvis march) for convex hull, then rotating calipers
for the minimum area bounding rectangle. Pure Python, no scipy/numpy.
"""
if len(points) < 3:
return None
try:
hull = _convex_hull(points)
if len(hull) < 3:
return None
rect = _min_bounding_rect(hull)
# Normalize to [0, 1]
return [[p[0] / page_w, p[1] / page_h] for p in rect]
except Exception:
logger.debug("Failed to approximate polygon to quad, returning None")
return None
def _convex_hull(points: list[tuple]) -> list[tuple]:
"""Jarvis march (gift wrapping) algorithm for 2D convex hull."""
pts = list(set(points))
if len(pts) < 3:
return pts
# Start from leftmost point
start = min(pts, key=lambda p: (p[0], p[1]))
hull = []
current = start
while True:
hull.append(current)
candidate = pts[0]
for p in pts[1:]:
if candidate == current:
candidate = p
continue
cross = _cross(current, candidate, p)
if cross < 0:
candidate = p
elif cross == 0:
# Collinear — pick the farther point
if _dist_sq(current, p) > _dist_sq(current, candidate):
candidate = p
current = candidate
if current == start:
break
return hull
def _min_bounding_rect(hull: list[tuple]) -> list[tuple]:
"""Find the minimum area bounding rectangle of a convex hull using rotating calipers."""
n = len(hull)
if n < 2:
return hull
min_area = float("inf")
best_rect = None
for i in range(n):
# Edge vector
edge_x = hull[(i + 1) % n][0] - hull[i][0]
edge_y = hull[(i + 1) % n][1] - hull[i][1]
edge_len = (edge_x ** 2 + edge_y ** 2) ** 0.5
if edge_len == 0:
continue
# Unit vectors along and perpendicular to the edge
ux, uy = edge_x / edge_len, edge_y / edge_len
vx, vy = -uy, ux
# Project all hull points onto the edge coordinate system
projs_u = [p[0] * ux + p[1] * uy for p in hull]
projs_v = [p[0] * vx + p[1] * vy for p in hull]
min_u, max_u = min(projs_u), max(projs_u)
min_v, max_v = min(projs_v), max(projs_v)
area = (max_u - min_u) * (max_v - min_v)
if area < min_area:
min_area = area
# Reconstruct 4 corners in original coordinates
best_rect = [
(min_u * ux + min_v * vx, min_u * uy + min_v * vy),
(max_u * ux + min_v * vx, max_u * uy + min_v * vy),
(max_u * ux + max_v * vx, max_u * uy + max_v * vy),
(min_u * ux + max_v * vx, min_u * uy + max_v * vy),
]
return best_rect if best_rect else hull[:4]
def _cross(o: tuple, a: tuple, b: tuple) -> float:
return (a[0] - o[0]) * (b[1] - o[1]) - (a[1] - o[1]) * (b[0] - o[0])
def _dist_sq(a: tuple, b: tuple) -> float:
return (a[0] - b[0]) ** 2 + (a[1] - b[1]) ** 2