Files
familienarchiv/ocr-service/test_metrics.py
Marcel d6abf990c7 feat(ocr): flip ocr_models_ready to 1 once the lifespan startup finishes
Mirrors the existing _models_ready bool so Prometheus has a time-series
liveness/readiness signal for future alerting rules (e.g.
ocr_models_ready < 1 for 2m).

Refs #652 (AC7)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-21 16:15:11 +02:00

445 lines
20 KiB
Python

"""Tests for Prometheus metrics exposed by the OCR service.
Each test that asserts on a counter/gauge value uses a fresh CollectorRegistry
(see decision #3 on issue #652) to keep the metrics isolated between tests.
"""
import io
import zipfile
from unittest.mock import AsyncMock, patch
import pytest
from httpx import ASGITransport, AsyncClient
from PIL import Image
from prometheus_client import CollectorRegistry
from main import app
from metrics import build_metrics
def _minimal_zip() -> bytes:
"""Return a ZIP containing one fake .xml so endpoint validation passes."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("page_01.xml", "<PcGts/>")
return buf.getvalue()
def _fake_training_result(accuracy: float = 0.91) -> dict:
return {"loss": None, "accuracy": accuracy, "cer": round(1 - accuracy, 4), "epochs": 5}
@pytest.fixture
def fresh_metrics(monkeypatch):
"""Replace the module-level `main.metrics` with one bound to a fresh registry."""
registry = CollectorRegistry()
test_metrics = build_metrics(registry)
monkeypatch.setattr("main.metrics", test_metrics)
return test_metrics
@pytest.mark.asyncio
async def test_metrics_endpoint_returns_200():
"""`GET /metrics` returns 200 with Prometheus exposition content."""
with patch("main.kraken_engine.load_models"), \
patch("main.load_spell_checker"):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.get("/metrics")
assert response.status_code == 200
assert "text/plain" in response.headers.get("content-type", "")
@pytest.mark.asyncio
async def test_metrics_includes_http_request_metrics_after_ocr_call():
"""After a request to /ocr, `/metrics` exposes auto-instrumented http_* metrics."""
mock_images = [Image.new("RGB", (100, 100))]
mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
"polygon": None, "text": "hi", "words": []}]
with patch("main.kraken_engine.load_models"), \
patch("main.load_spell_checker"), \
patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main.preprocess_page", side_effect=lambda img: img), \
patch("main.surya_engine.extract_blocks", return_value=mock_blocks):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
# Models need to be loaded for /ocr to accept requests; force the flag.
import main as main_module
main_module._models_ready = True
try:
ocr_response = await client.post("/ocr", json={
"pdfUrl": "http://minio/doc.pdf",
"scriptType": "TYPEWRITER",
"language": "de",
})
assert ocr_response.status_code == 200, ocr_response.text
metrics_response = await client.get("/metrics")
finally:
main_module._models_ready = False
body = metrics_response.text
assert "http_requests_total" in body
assert "http_request_duration_seconds" in body
def test_build_metrics_registers_all_custom_metrics_on_given_registry():
"""`build_metrics` returns an OcrMetrics bound to the supplied registry."""
registry = CollectorRegistry()
metrics = build_metrics(registry)
metric_names = {m.name for m in registry.collect()}
expected = {
"ocr_jobs",
"ocr_pages",
"ocr_skipped_pages",
"ocr_words",
"ocr_illegible_words",
"ocr_processing_seconds",
"ocr_training_runs",
"ocr_model_accuracy",
"ocr_models_ready",
}
assert expected <= metric_names, f"missing: {expected - metric_names}"
# A second registry yields a separate container — no shared state.
other_metrics = build_metrics(CollectorRegistry())
assert metrics is not other_metrics
async def _drive_ocr(client: AsyncClient, *, script_type: str) -> None:
"""Helper — fires /ocr with a single mocked page and asserts a 200."""
response = await client.post("/ocr", json={
"pdfUrl": "http://minio/doc.pdf",
"scriptType": script_type,
"language": "de",
})
assert response.status_code == 200, response.text
@pytest.mark.asyncio
async def test_ocr_jobs_total_incremented_with_kraken_engine_label_for_kurrent(fresh_metrics):
"""A /ocr call with HANDWRITING_KURRENT increments engine=kraken."""
mock_images = [Image.new("RGB", (100, 100))]
mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
"polygon": None, "text": "hi", "words": []}]
with patch("main.kraken_engine.load_models"), \
patch("main.load_spell_checker"), \
patch("main.correct_text", side_effect=lambda t: t), \
patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main.preprocess_page", side_effect=lambda img: img), \
patch("main.kraken_engine.is_available", return_value=True), \
patch("main.kraken_engine.extract_blocks", return_value=mock_blocks):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
import main as main_module
main_module._models_ready = True
try:
await _drive_ocr(client, script_type="HANDWRITING_KURRENT")
finally:
main_module._models_ready = False
value = fresh_metrics.ocr_jobs_total.labels(
engine="kraken", script_type="HANDWRITING_KURRENT"
)._value.get()
assert value == 1.0
@pytest.mark.asyncio
async def test_ocr_jobs_total_incremented_with_surya_engine_label_for_typewriter(fresh_metrics):
"""A /ocr call with TYPEWRITER increments engine=surya."""
mock_images = [Image.new("RGB", (100, 100))]
mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
"polygon": None, "text": "hi", "words": []}]
with patch("main.kraken_engine.load_models"), \
patch("main.load_spell_checker"), \
patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main.preprocess_page", side_effect=lambda img: img), \
patch("main.surya_engine.extract_blocks", return_value=mock_blocks):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
import main as main_module
main_module._models_ready = True
try:
await _drive_ocr(client, script_type="TYPEWRITER")
finally:
main_module._models_ready = False
value = fresh_metrics.ocr_jobs_total.labels(
engine="surya", script_type="TYPEWRITER"
)._value.get()
assert value == 1.0
@pytest.mark.asyncio
async def test_ocr_pages_total_incremented_once_per_page_in_stream(fresh_metrics):
"""The /ocr/stream generator increments ocr_pages_total per successful page."""
mock_images = [Image.new("RGB", (100, 100)) for _ in range(3)]
mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
"polygon": None, "text": "hi", "words": []}]
with patch("main.kraken_engine.load_models"), \
patch("main.load_spell_checker"), \
patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main.preprocess_page", side_effect=lambda img: img), \
patch("main.surya_engine.extract_page_blocks", return_value=mock_blocks):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
import main as main_module
main_module._models_ready = True
try:
async with client.stream("POST", "/ocr/stream", json={
"pdfUrl": "http://minio/doc.pdf",
"scriptType": "TYPEWRITER",
"language": "de",
}) as response:
assert response.status_code == 200
# Drain the stream so all per-page increments fire.
async for _ in response.aiter_lines():
pass
finally:
main_module._models_ready = False
value = fresh_metrics.ocr_pages_total.labels(engine="surya")._value.get()
assert value == 3.0
@pytest.mark.asyncio
async def test_ocr_skipped_pages_total_incremented_when_engine_raises_for_a_page(fresh_metrics):
"""When the engine raises on a page, ocr_skipped_pages_total bumps and the stream finishes."""
mock_images = [Image.new("RGB", (100, 100)) for _ in range(2)]
good_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
"polygon": None, "text": "ok", "words": []}]
call_count = {"n": 0}
def extract_side_effect(*args, **kwargs):
call_count["n"] += 1
if call_count["n"] == 1:
raise RuntimeError("synthetic engine failure")
return good_blocks
with patch("main.kraken_engine.load_models"), \
patch("main.load_spell_checker"), \
patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main.preprocess_page", side_effect=lambda img: img), \
patch("main.surya_engine.extract_page_blocks", side_effect=extract_side_effect):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
import main as main_module
main_module._models_ready = True
try:
async with client.stream("POST", "/ocr/stream", json={
"pdfUrl": "http://minio/doc.pdf",
"scriptType": "TYPEWRITER",
"language": "de",
}) as response:
assert response.status_code == 200
saw_error = False
async for line in response.aiter_lines():
if line and '"type": "error"' in line:
saw_error = True
assert saw_error
finally:
main_module._models_ready = False
assert fresh_metrics.ocr_skipped_pages_total._value.get() == 1.0
# The second page still succeeds.
assert fresh_metrics.ocr_pages_total.labels(engine="surya")._value.get() == 1.0
@pytest.mark.asyncio
async def test_ocr_words_and_illegible_words_total_sum_across_blocks(fresh_metrics):
"""Counters reflect totals summed over every block in the request.
Threshold defaults to THRESHOLD_DEFAULT (0.3) for non-Kurrent scripts. Two
blocks: 3 words above + 2 words below threshold across blocks.
"""
mock_images = [Image.new("RGB", (100, 100))]
mock_blocks = [
{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
"polygon": None, "text": "ignored",
"words": [{"text": "Lieber", "confidence": 0.9},
{"text": "Freund", "confidence": 0.1}]},
{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
"polygon": None, "text": "ignored",
"words": [{"text": "Gruss", "confidence": 0.8},
{"text": "verschmiert", "confidence": 0.05},
{"text": "Karl", "confidence": 0.95}]},
]
with patch("main.kraken_engine.load_models"), \
patch("main.load_spell_checker"), \
patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main.preprocess_page", side_effect=lambda img: img), \
patch("main.surya_engine.extract_blocks", return_value=mock_blocks):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
import main as main_module
main_module._models_ready = True
try:
await _drive_ocr(client, script_type="TYPEWRITER")
finally:
main_module._models_ready = False
assert fresh_metrics.ocr_words_total._value.get() == 5.0
assert fresh_metrics.ocr_illegible_words_total._value.get() == 2.0
def _histogram_count_sum(histogram, **labels) -> tuple[float, float]:
"""Read the per-label-set _count and _sum from a prometheus_client Histogram."""
child = histogram.labels(**labels)
return child._sum.get(), sum(b.get() for b in child._buckets)
@pytest.mark.asyncio
async def test_ocr_processing_seconds_histogram_observed_per_page_in_stream(fresh_metrics):
"""The streaming generator observes ocr_processing_seconds once per page."""
mock_images = [Image.new("RGB", (100, 100)) for _ in range(2)]
mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
"polygon": None, "text": "ok", "words": []}]
with patch("main.kraken_engine.load_models"), \
patch("main.load_spell_checker"), \
patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main.preprocess_page", side_effect=lambda img: img), \
patch("main.surya_engine.extract_page_blocks", return_value=mock_blocks):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
import main as main_module
main_module._models_ready = True
try:
async with client.stream("POST", "/ocr/stream", json={
"pdfUrl": "http://minio/doc.pdf",
"scriptType": "TYPEWRITER",
"language": "de",
}) as response:
assert response.status_code == 200
async for _ in response.aiter_lines():
pass
finally:
main_module._models_ready = False
sum_seconds, count = _histogram_count_sum(
fresh_metrics.ocr_processing_seconds, engine="surya"
)
assert count == 2.0
assert sum_seconds >= 0.0
@pytest.mark.asyncio
async def test_ocr_training_runs_total_incremented_with_recognition_success_label(fresh_metrics):
"""/train success increments ocr_training_runs_total{kind=recognition, outcome=success}."""
async def fake_to_thread(func, *args, **kwargs):
return _fake_training_result()
with patch("main.TRAINING_TOKEN", "secret-token"), \
patch("main._models_ready", True), \
patch("main.asyncio.to_thread", side_effect=fake_to_thread):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post(
"/train",
files={"file": ("training.zip", _minimal_zip(), "application/zip")},
headers={"X-Training-Token": "secret-token"},
)
assert response.status_code == 200
assert fresh_metrics.ocr_training_runs_total.labels(
kind="recognition", outcome="success"
)._value.get() == 1.0
@pytest.mark.asyncio
async def test_ocr_training_runs_total_incremented_with_recognition_error_label(fresh_metrics):
"""When /train's inner runner raises, the error counter bumps and the exception propagates."""
async def fake_to_thread(func, *args, **kwargs):
raise RuntimeError("ketos train failed (exit 1): synthetic")
with patch("main.TRAINING_TOKEN", "secret-token"), \
patch("main._models_ready", True), \
patch("main.asyncio.to_thread", side_effect=fake_to_thread):
transport = ASGITransport(app=app, raise_app_exceptions=False)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.post(
"/train",
files={"file": ("training.zip", _minimal_zip(), "application/zip")},
headers={"X-Training-Token": "secret-token"},
)
assert response.status_code == 500
assert fresh_metrics.ocr_training_runs_total.labels(
kind="recognition", outcome="error"
)._value.get() == 1.0
@pytest.mark.asyncio
async def test_ocr_training_runs_total_incremented_with_segmentation_success_label(fresh_metrics):
"""/segtrain success increments ocr_training_runs_total{kind=segmentation, outcome=success}."""
async def fake_to_thread(func, *args, **kwargs):
return _fake_training_result(accuracy=0.83)
with patch("main.TRAINING_TOKEN", "secret-token"), \
patch("main._models_ready", True), \
patch("main.asyncio.to_thread", side_effect=fake_to_thread):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post(
"/segtrain",
files={"file": ("training.zip", _minimal_zip(), "application/zip")},
headers={"X-Training-Token": "secret-token"},
)
assert response.status_code == 200
assert fresh_metrics.ocr_training_runs_total.labels(
kind="segmentation", outcome="success"
)._value.get() == 1.0
@pytest.mark.asyncio
async def test_ocr_model_accuracy_gauge_set_per_kind_after_successful_training(fresh_metrics):
"""After /train and /segtrain succeed, ocr_model_accuracy{kind=...} reflects the result."""
recognition_accuracy = 0.917
segmentation_accuracy = 0.834
async def fake_recognition_to_thread(func, *args, **kwargs):
return _fake_training_result(accuracy=recognition_accuracy)
async def fake_segmentation_to_thread(func, *args, **kwargs):
return _fake_training_result(accuracy=segmentation_accuracy)
with patch("main.TRAINING_TOKEN", "secret-token"), \
patch("main._models_ready", True):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
with patch("main.asyncio.to_thread", side_effect=fake_recognition_to_thread):
rec_resp = await client.post(
"/train",
files={"file": ("training.zip", _minimal_zip(), "application/zip")},
headers={"X-Training-Token": "secret-token"},
)
assert rec_resp.status_code == 200
with patch("main.asyncio.to_thread", side_effect=fake_segmentation_to_thread):
seg_resp = await client.post(
"/segtrain",
files={"file": ("training.zip", _minimal_zip(), "application/zip")},
headers={"X-Training-Token": "secret-token"},
)
assert seg_resp.status_code == 200
assert fresh_metrics.ocr_model_accuracy.labels(kind="recognition")._value.get() == pytest.approx(recognition_accuracy)
assert fresh_metrics.ocr_model_accuracy.labels(kind="segmentation")._value.get() == pytest.approx(segmentation_accuracy)
def test_ocr_models_ready_gauge_defaults_to_zero():
"""A freshly-built OcrMetrics has ocr_models_ready=0 before lifespan runs."""
metrics = build_metrics(CollectorRegistry())
assert metrics.ocr_models_ready._value.get() == 0.0
@pytest.mark.asyncio
async def test_ocr_models_ready_gauge_is_one_after_lifespan_startup(fresh_metrics):
"""The lifespan flips ocr_models_ready to 1 once load_models / load_spell_checker return.
ASGITransport does not run lifespan by default, so the lifespan context
manager is driven directly to exercise the startup code path.
"""
assert fresh_metrics.ocr_models_ready._value.get() == 0.0
with patch("main.kraken_engine.load_models"), \
patch("main.load_spell_checker"):
async with app.router.lifespan_context(app):
assert fresh_metrics.ocr_models_ready._value.get() == 1.0