Adds a logging.Filter on uvicorn.access that drops records whose request path is /metrics or /health. Each is hit on a tight schedule (Prometheus scrape interval and Docker healthcheck), so unfiltered they dominate the access log without carrying any information about real traffic. Refs #652 (Nora's recommendation) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
475 lines
21 KiB
Python
475 lines
21 KiB
Python
"""Tests for Prometheus metrics exposed by the OCR service.
|
|
|
|
Each test that asserts on a counter/gauge value uses a fresh CollectorRegistry
|
|
(see decision #3 on issue #652) to keep the metrics isolated between tests.
|
|
"""
|
|
|
|
import io
|
|
import zipfile
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
import pytest
|
|
from httpx import ASGITransport, AsyncClient
|
|
from PIL import Image
|
|
from prometheus_client import CollectorRegistry
|
|
|
|
from main import app
|
|
from metrics import build_metrics
|
|
|
|
|
|
def _minimal_zip() -> bytes:
|
|
"""Return a ZIP containing one fake .xml so endpoint validation passes."""
|
|
buf = io.BytesIO()
|
|
with zipfile.ZipFile(buf, "w") as zf:
|
|
zf.writestr("page_01.xml", "<PcGts/>")
|
|
return buf.getvalue()
|
|
|
|
|
|
def _fake_training_result(accuracy: float = 0.91) -> dict:
|
|
return {"loss": None, "accuracy": accuracy, "cer": round(1 - accuracy, 4), "epochs": 5}
|
|
|
|
|
|
@pytest.fixture
|
|
def fresh_metrics(monkeypatch):
|
|
"""Replace the module-level `main.metrics` with one bound to a fresh registry."""
|
|
registry = CollectorRegistry()
|
|
test_metrics = build_metrics(registry)
|
|
monkeypatch.setattr("main.metrics", test_metrics)
|
|
return test_metrics
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_metrics_endpoint_returns_200():
|
|
"""`GET /metrics` returns 200 with Prometheus exposition content."""
|
|
with patch("main.kraken_engine.load_models"), \
|
|
patch("main.load_spell_checker"):
|
|
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
|
|
response = await client.get("/metrics")
|
|
|
|
assert response.status_code == 200
|
|
assert "text/plain" in response.headers.get("content-type", "")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_metrics_includes_http_request_metrics_after_ocr_call():
|
|
"""After a request to /ocr, `/metrics` exposes auto-instrumented http_* metrics."""
|
|
mock_images = [Image.new("RGB", (100, 100))]
|
|
mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
|
|
"polygon": None, "text": "hi", "words": []}]
|
|
|
|
with patch("main.kraken_engine.load_models"), \
|
|
patch("main.load_spell_checker"), \
|
|
patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
|
|
patch("main.preprocess_page", side_effect=lambda img: img), \
|
|
patch("main.surya_engine.extract_blocks", return_value=mock_blocks):
|
|
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
|
|
# Models need to be loaded for /ocr to accept requests; force the flag.
|
|
import main as main_module
|
|
main_module._models_ready = True
|
|
try:
|
|
ocr_response = await client.post("/ocr", json={
|
|
"pdfUrl": "http://minio/doc.pdf",
|
|
"scriptType": "TYPEWRITER",
|
|
"language": "de",
|
|
})
|
|
assert ocr_response.status_code == 200, ocr_response.text
|
|
|
|
metrics_response = await client.get("/metrics")
|
|
finally:
|
|
main_module._models_ready = False
|
|
|
|
body = metrics_response.text
|
|
assert "http_requests_total" in body
|
|
assert "http_request_duration_seconds" in body
|
|
|
|
|
|
def test_build_metrics_registers_all_custom_metrics_on_given_registry():
|
|
"""`build_metrics` returns an OcrMetrics bound to the supplied registry."""
|
|
registry = CollectorRegistry()
|
|
metrics = build_metrics(registry)
|
|
|
|
metric_names = {m.name for m in registry.collect()}
|
|
expected = {
|
|
"ocr_jobs",
|
|
"ocr_pages",
|
|
"ocr_skipped_pages",
|
|
"ocr_words",
|
|
"ocr_illegible_words",
|
|
"ocr_processing_seconds",
|
|
"ocr_training_runs",
|
|
"ocr_model_accuracy",
|
|
"ocr_models_ready",
|
|
}
|
|
assert expected <= metric_names, f"missing: {expected - metric_names}"
|
|
|
|
# A second registry yields a separate container — no shared state.
|
|
other_metrics = build_metrics(CollectorRegistry())
|
|
assert metrics is not other_metrics
|
|
|
|
|
|
async def _drive_ocr(client: AsyncClient, *, script_type: str) -> None:
|
|
"""Helper — fires /ocr with a single mocked page and asserts a 200."""
|
|
response = await client.post("/ocr", json={
|
|
"pdfUrl": "http://minio/doc.pdf",
|
|
"scriptType": script_type,
|
|
"language": "de",
|
|
})
|
|
assert response.status_code == 200, response.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ocr_jobs_total_incremented_with_kraken_engine_label_for_kurrent(fresh_metrics):
|
|
"""A /ocr call with HANDWRITING_KURRENT increments engine=kraken."""
|
|
mock_images = [Image.new("RGB", (100, 100))]
|
|
mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
|
|
"polygon": None, "text": "hi", "words": []}]
|
|
|
|
with patch("main.kraken_engine.load_models"), \
|
|
patch("main.load_spell_checker"), \
|
|
patch("main.correct_text", side_effect=lambda t: t), \
|
|
patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
|
|
patch("main.preprocess_page", side_effect=lambda img: img), \
|
|
patch("main.kraken_engine.is_available", return_value=True), \
|
|
patch("main.kraken_engine.extract_blocks", return_value=mock_blocks):
|
|
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
|
|
import main as main_module
|
|
main_module._models_ready = True
|
|
try:
|
|
await _drive_ocr(client, script_type="HANDWRITING_KURRENT")
|
|
finally:
|
|
main_module._models_ready = False
|
|
|
|
value = fresh_metrics.ocr_jobs_total.labels(
|
|
engine="kraken", script_type="HANDWRITING_KURRENT"
|
|
)._value.get()
|
|
assert value == 1.0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ocr_jobs_total_incremented_with_surya_engine_label_for_typewriter(fresh_metrics):
|
|
"""A /ocr call with TYPEWRITER increments engine=surya."""
|
|
mock_images = [Image.new("RGB", (100, 100))]
|
|
mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
|
|
"polygon": None, "text": "hi", "words": []}]
|
|
|
|
with patch("main.kraken_engine.load_models"), \
|
|
patch("main.load_spell_checker"), \
|
|
patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
|
|
patch("main.preprocess_page", side_effect=lambda img: img), \
|
|
patch("main.surya_engine.extract_blocks", return_value=mock_blocks):
|
|
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
|
|
import main as main_module
|
|
main_module._models_ready = True
|
|
try:
|
|
await _drive_ocr(client, script_type="TYPEWRITER")
|
|
finally:
|
|
main_module._models_ready = False
|
|
|
|
value = fresh_metrics.ocr_jobs_total.labels(
|
|
engine="surya", script_type="TYPEWRITER"
|
|
)._value.get()
|
|
assert value == 1.0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ocr_pages_total_incremented_once_per_page_in_stream(fresh_metrics):
|
|
"""The /ocr/stream generator increments ocr_pages_total per successful page."""
|
|
mock_images = [Image.new("RGB", (100, 100)) for _ in range(3)]
|
|
mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
|
|
"polygon": None, "text": "hi", "words": []}]
|
|
|
|
with patch("main.kraken_engine.load_models"), \
|
|
patch("main.load_spell_checker"), \
|
|
patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
|
|
patch("main.preprocess_page", side_effect=lambda img: img), \
|
|
patch("main.surya_engine.extract_page_blocks", return_value=mock_blocks):
|
|
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
|
|
import main as main_module
|
|
main_module._models_ready = True
|
|
try:
|
|
async with client.stream("POST", "/ocr/stream", json={
|
|
"pdfUrl": "http://minio/doc.pdf",
|
|
"scriptType": "TYPEWRITER",
|
|
"language": "de",
|
|
}) as response:
|
|
assert response.status_code == 200
|
|
# Drain the stream so all per-page increments fire.
|
|
async for _ in response.aiter_lines():
|
|
pass
|
|
finally:
|
|
main_module._models_ready = False
|
|
|
|
value = fresh_metrics.ocr_pages_total.labels(engine="surya")._value.get()
|
|
assert value == 3.0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ocr_skipped_pages_total_incremented_when_engine_raises_for_a_page(fresh_metrics):
|
|
"""When the engine raises on a page, ocr_skipped_pages_total bumps and the stream finishes."""
|
|
mock_images = [Image.new("RGB", (100, 100)) for _ in range(2)]
|
|
good_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
|
|
"polygon": None, "text": "ok", "words": []}]
|
|
|
|
call_count = {"n": 0}
|
|
|
|
def extract_side_effect(*args, **kwargs):
|
|
call_count["n"] += 1
|
|
if call_count["n"] == 1:
|
|
raise RuntimeError("synthetic engine failure")
|
|
return good_blocks
|
|
|
|
with patch("main.kraken_engine.load_models"), \
|
|
patch("main.load_spell_checker"), \
|
|
patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
|
|
patch("main.preprocess_page", side_effect=lambda img: img), \
|
|
patch("main.surya_engine.extract_page_blocks", side_effect=extract_side_effect):
|
|
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
|
|
import main as main_module
|
|
main_module._models_ready = True
|
|
try:
|
|
async with client.stream("POST", "/ocr/stream", json={
|
|
"pdfUrl": "http://minio/doc.pdf",
|
|
"scriptType": "TYPEWRITER",
|
|
"language": "de",
|
|
}) as response:
|
|
assert response.status_code == 200
|
|
saw_error = False
|
|
async for line in response.aiter_lines():
|
|
if line and '"type": "error"' in line:
|
|
saw_error = True
|
|
assert saw_error
|
|
finally:
|
|
main_module._models_ready = False
|
|
|
|
assert fresh_metrics.ocr_skipped_pages_total._value.get() == 1.0
|
|
# The second page still succeeds.
|
|
assert fresh_metrics.ocr_pages_total.labels(engine="surya")._value.get() == 1.0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ocr_words_and_illegible_words_total_sum_across_blocks(fresh_metrics):
|
|
"""Counters reflect totals summed over every block in the request.
|
|
|
|
Threshold defaults to THRESHOLD_DEFAULT (0.3) for non-Kurrent scripts. Two
|
|
blocks: 3 words above + 2 words below threshold across blocks.
|
|
"""
|
|
mock_images = [Image.new("RGB", (100, 100))]
|
|
mock_blocks = [
|
|
{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
|
|
"polygon": None, "text": "ignored",
|
|
"words": [{"text": "Lieber", "confidence": 0.9},
|
|
{"text": "Freund", "confidence": 0.1}]},
|
|
{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
|
|
"polygon": None, "text": "ignored",
|
|
"words": [{"text": "Gruss", "confidence": 0.8},
|
|
{"text": "verschmiert", "confidence": 0.05},
|
|
{"text": "Karl", "confidence": 0.95}]},
|
|
]
|
|
|
|
with patch("main.kraken_engine.load_models"), \
|
|
patch("main.load_spell_checker"), \
|
|
patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
|
|
patch("main.preprocess_page", side_effect=lambda img: img), \
|
|
patch("main.surya_engine.extract_blocks", return_value=mock_blocks):
|
|
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
|
|
import main as main_module
|
|
main_module._models_ready = True
|
|
try:
|
|
await _drive_ocr(client, script_type="TYPEWRITER")
|
|
finally:
|
|
main_module._models_ready = False
|
|
|
|
assert fresh_metrics.ocr_words_total._value.get() == 5.0
|
|
assert fresh_metrics.ocr_illegible_words_total._value.get() == 2.0
|
|
|
|
|
|
def _histogram_count_sum(histogram, **labels) -> tuple[float, float]:
|
|
"""Read the per-label-set _count and _sum from a prometheus_client Histogram."""
|
|
child = histogram.labels(**labels)
|
|
return child._sum.get(), sum(b.get() for b in child._buckets)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ocr_processing_seconds_histogram_observed_per_page_in_stream(fresh_metrics):
|
|
"""The streaming generator observes ocr_processing_seconds once per page."""
|
|
mock_images = [Image.new("RGB", (100, 100)) for _ in range(2)]
|
|
mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
|
|
"polygon": None, "text": "ok", "words": []}]
|
|
|
|
with patch("main.kraken_engine.load_models"), \
|
|
patch("main.load_spell_checker"), \
|
|
patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
|
|
patch("main.preprocess_page", side_effect=lambda img: img), \
|
|
patch("main.surya_engine.extract_page_blocks", return_value=mock_blocks):
|
|
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
|
|
import main as main_module
|
|
main_module._models_ready = True
|
|
try:
|
|
async with client.stream("POST", "/ocr/stream", json={
|
|
"pdfUrl": "http://minio/doc.pdf",
|
|
"scriptType": "TYPEWRITER",
|
|
"language": "de",
|
|
}) as response:
|
|
assert response.status_code == 200
|
|
async for _ in response.aiter_lines():
|
|
pass
|
|
finally:
|
|
main_module._models_ready = False
|
|
|
|
sum_seconds, count = _histogram_count_sum(
|
|
fresh_metrics.ocr_processing_seconds, engine="surya"
|
|
)
|
|
assert count == 2.0
|
|
assert sum_seconds >= 0.0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ocr_training_runs_total_incremented_with_recognition_success_label(fresh_metrics):
|
|
"""/train success increments ocr_training_runs_total{kind=recognition, outcome=success}."""
|
|
async def fake_to_thread(func, *args, **kwargs):
|
|
return _fake_training_result()
|
|
|
|
with patch("main.TRAINING_TOKEN", "secret-token"), \
|
|
patch("main._models_ready", True), \
|
|
patch("main.asyncio.to_thread", side_effect=fake_to_thread):
|
|
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
|
|
response = await client.post(
|
|
"/train",
|
|
files={"file": ("training.zip", _minimal_zip(), "application/zip")},
|
|
headers={"X-Training-Token": "secret-token"},
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
assert fresh_metrics.ocr_training_runs_total.labels(
|
|
kind="recognition", outcome="success"
|
|
)._value.get() == 1.0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ocr_training_runs_total_incremented_with_recognition_error_label(fresh_metrics):
|
|
"""When /train's inner runner raises, the error counter bumps and the exception propagates."""
|
|
async def fake_to_thread(func, *args, **kwargs):
|
|
raise RuntimeError("ketos train failed (exit 1): synthetic")
|
|
|
|
with patch("main.TRAINING_TOKEN", "secret-token"), \
|
|
patch("main._models_ready", True), \
|
|
patch("main.asyncio.to_thread", side_effect=fake_to_thread):
|
|
transport = ASGITransport(app=app, raise_app_exceptions=False)
|
|
async with AsyncClient(transport=transport, base_url="http://test") as client:
|
|
response = await client.post(
|
|
"/train",
|
|
files={"file": ("training.zip", _minimal_zip(), "application/zip")},
|
|
headers={"X-Training-Token": "secret-token"},
|
|
)
|
|
|
|
assert response.status_code == 500
|
|
assert fresh_metrics.ocr_training_runs_total.labels(
|
|
kind="recognition", outcome="error"
|
|
)._value.get() == 1.0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ocr_training_runs_total_incremented_with_segmentation_success_label(fresh_metrics):
|
|
"""/segtrain success increments ocr_training_runs_total{kind=segmentation, outcome=success}."""
|
|
async def fake_to_thread(func, *args, **kwargs):
|
|
return _fake_training_result(accuracy=0.83)
|
|
|
|
with patch("main.TRAINING_TOKEN", "secret-token"), \
|
|
patch("main._models_ready", True), \
|
|
patch("main.asyncio.to_thread", side_effect=fake_to_thread):
|
|
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
|
|
response = await client.post(
|
|
"/segtrain",
|
|
files={"file": ("training.zip", _minimal_zip(), "application/zip")},
|
|
headers={"X-Training-Token": "secret-token"},
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
assert fresh_metrics.ocr_training_runs_total.labels(
|
|
kind="segmentation", outcome="success"
|
|
)._value.get() == 1.0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ocr_model_accuracy_gauge_set_per_kind_after_successful_training(fresh_metrics):
|
|
"""After /train and /segtrain succeed, ocr_model_accuracy{kind=...} reflects the result."""
|
|
recognition_accuracy = 0.917
|
|
segmentation_accuracy = 0.834
|
|
|
|
async def fake_recognition_to_thread(func, *args, **kwargs):
|
|
return _fake_training_result(accuracy=recognition_accuracy)
|
|
|
|
async def fake_segmentation_to_thread(func, *args, **kwargs):
|
|
return _fake_training_result(accuracy=segmentation_accuracy)
|
|
|
|
with patch("main.TRAINING_TOKEN", "secret-token"), \
|
|
patch("main._models_ready", True):
|
|
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
|
|
with patch("main.asyncio.to_thread", side_effect=fake_recognition_to_thread):
|
|
rec_resp = await client.post(
|
|
"/train",
|
|
files={"file": ("training.zip", _minimal_zip(), "application/zip")},
|
|
headers={"X-Training-Token": "secret-token"},
|
|
)
|
|
assert rec_resp.status_code == 200
|
|
with patch("main.asyncio.to_thread", side_effect=fake_segmentation_to_thread):
|
|
seg_resp = await client.post(
|
|
"/segtrain",
|
|
files={"file": ("training.zip", _minimal_zip(), "application/zip")},
|
|
headers={"X-Training-Token": "secret-token"},
|
|
)
|
|
assert seg_resp.status_code == 200
|
|
|
|
assert fresh_metrics.ocr_model_accuracy.labels(kind="recognition")._value.get() == pytest.approx(recognition_accuracy)
|
|
assert fresh_metrics.ocr_model_accuracy.labels(kind="segmentation")._value.get() == pytest.approx(segmentation_accuracy)
|
|
|
|
|
|
def test_ocr_models_ready_gauge_defaults_to_zero():
|
|
"""A freshly-built OcrMetrics has ocr_models_ready=0 before lifespan runs."""
|
|
metrics = build_metrics(CollectorRegistry())
|
|
assert metrics.ocr_models_ready._value.get() == 0.0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ocr_models_ready_gauge_is_one_after_lifespan_startup(fresh_metrics):
|
|
"""The lifespan flips ocr_models_ready to 1 once load_models / load_spell_checker return.
|
|
|
|
ASGITransport does not run lifespan by default, so the lifespan context
|
|
manager is driven directly to exercise the startup code path.
|
|
"""
|
|
assert fresh_metrics.ocr_models_ready._value.get() == 0.0
|
|
with patch("main.kraken_engine.load_models"), \
|
|
patch("main.load_spell_checker"):
|
|
async with app.router.lifespan_context(app):
|
|
assert fresh_metrics.ocr_models_ready._value.get() == 1.0
|
|
|
|
|
|
def test_uvicorn_access_log_filter_skips_metrics_path():
|
|
"""The MetricsPathFilter drops uvicorn.access log records that target /metrics."""
|
|
import logging as _logging
|
|
from main import MetricsPathFilter
|
|
|
|
filt = MetricsPathFilter()
|
|
metrics_record = _logging.LogRecord(
|
|
name="uvicorn.access", level=_logging.INFO, pathname="", lineno=0,
|
|
msg='%s - "%s %s HTTP/%s" %d',
|
|
args=("127.0.0.1:1234", "GET", "/metrics", "1.1", 200),
|
|
exc_info=None,
|
|
)
|
|
health_record = _logging.LogRecord(
|
|
name="uvicorn.access", level=_logging.INFO, pathname="", lineno=0,
|
|
msg='%s - "%s %s HTTP/%s" %d',
|
|
args=("127.0.0.1:1234", "GET", "/health", "1.1", 200),
|
|
exc_info=None,
|
|
)
|
|
ocr_record = _logging.LogRecord(
|
|
name="uvicorn.access", level=_logging.INFO, pathname="", lineno=0,
|
|
msg='%s - "%s %s HTTP/%s" %d',
|
|
args=("127.0.0.1:1234", "POST", "/ocr", "1.1", 200),
|
|
exc_info=None,
|
|
)
|
|
|
|
assert filt.filter(metrics_record) is False
|
|
assert filt.filter(health_record) is False
|
|
assert filt.filter(ocr_record) is True
|