Files
familienarchiv/ocr-service/test_metrics.py
Marcel 0801da8df0
Some checks failed
CI / Backend Unit Tests (push) Successful in 3m42s
CI / fail2ban Regex (push) Successful in 43s
CI / Semgrep Security Scan (push) Successful in 19s
CI / Compose Bucket Idempotency (push) Successful in 1m0s
CI / Unit & Component Tests (pull_request) Successful in 3m24s
CI / OCR Service Tests (pull_request) Successful in 20s
CI / Backend Unit Tests (pull_request) Successful in 3m28s
CI / fail2ban Regex (pull_request) Successful in 43s
CI / Semgrep Security Scan (pull_request) Successful in 19s
CI / Compose Bucket Idempotency (pull_request) Successful in 1m1s
CI / Unit & Component Tests (push) Failing after 2m44s
CI / OCR Service Tests (push) Successful in 20s
docs(ocr): explain why two metrics tests skip fresh_metrics fixture
Sara's cycle-2 S2: clarify the latent (but not actual) cross-test state
risk on the two metrics tests that hit the global REGISTRY instead of
the per-test fresh_metrics fixture. Migrating them would actually break
them — the /metrics endpoint is served by prometheus-fastapi-instrumentator
which binds to the default REGISTRY at app-construction time, and the
http_requests_total assertion only finds counters on that global
registry. Both tests already assert response shape only (status code,
content-type substring, body substrings), not numeric values, so the
shared-registry caveat is documented for future readers rather than
treated as a bug to fix.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-21 17:23:32 +02:00

639 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Tests for Prometheus metrics exposed by the OCR service.
Each test that asserts on a counter/gauge value uses a fresh CollectorRegistry
(see decision #3 on issue #652) to keep the metrics isolated between tests.
"""
import contextlib
import io
import zipfile
from unittest.mock import AsyncMock, patch
import pytest
from httpx import ASGITransport, AsyncClient
from PIL import Image
from prometheus_client import CollectorRegistry
from main import app
from metrics import build_metrics
@contextlib.asynccontextmanager
async def ocr_client(*, raise_app_exceptions: bool = True):
"""Yield an AsyncClient with model-loaders patched and _models_ready forced on.
The shared setup for almost every metrics test: stub the heavy lifecycle
hooks (kraken_engine.load_models, load_spell_checker), flip the readiness
flag so request handlers do not 503, and restore it afterwards.
"""
with patch("main.kraken_engine.load_models"), \
patch("main.load_spell_checker"):
transport = ASGITransport(app=app, raise_app_exceptions=raise_app_exceptions)
async with AsyncClient(transport=transport, base_url="http://test") as client:
import main as main_module
main_module._models_ready = True
try:
yield client
finally:
main_module._models_ready = False
def _minimal_zip() -> bytes:
"""Return a ZIP containing one fake .xml so endpoint validation passes."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("page_01.xml", "<PcGts/>")
return buf.getvalue()
def _fake_training_result(accuracy: float = 0.91) -> dict:
return {"loss": None, "accuracy": accuracy, "cer": round(1 - accuracy, 4), "epochs": 5}
@pytest.fixture
def fresh_metrics(monkeypatch):
"""Replace the module-level `main.metrics` with one bound to a fresh registry."""
registry = CollectorRegistry()
test_metrics = build_metrics(registry)
monkeypatch.setattr("main.metrics", test_metrics)
return test_metrics
@pytest.mark.asyncio
async def test_metrics_endpoint_returns_200():
"""`GET /metrics` returns 200 with Prometheus exposition content.
Uses the global REGISTRY by design — does NOT take the `fresh_metrics` fixture.
The `/metrics` endpoint is wired by `prometheus-fastapi-instrumentator`, which
binds to the default REGISTRY at app-construction time; swapping `main.metrics`
via the fixture would not redirect what `/metrics` exposes. This test only
asserts response shape (status code + content-type substring), not numeric
counter values, so cross-test state leakage cannot affect it.
"""
with patch("main.kraken_engine.load_models"), \
patch("main.load_spell_checker"):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.get("/metrics")
assert response.status_code == 200
assert "text/plain" in response.headers.get("content-type", "")
@pytest.mark.asyncio
async def test_metrics_includes_http_request_metrics_after_ocr_call():
"""After a request to /ocr, `/metrics` exposes auto-instrumented http_* metrics.
Uses the global REGISTRY by design — does NOT take the `fresh_metrics` fixture.
The `http_requests_total` / `http_request_duration_seconds` metrics live on
the instrumentator's default REGISTRY (not on `main.metrics`), so a fresh
CollectorRegistry would never see them. This test only asserts response shape
(substring presence in the exposition body), not numeric counter values, so
cross-test state leakage cannot affect it.
"""
mock_images = [Image.new("RGB", (100, 100))]
mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
"polygon": None, "text": "hi", "words": []}]
with patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main.preprocess_page", side_effect=lambda img: img), \
patch("main.surya_engine.extract_blocks", return_value=mock_blocks):
async with ocr_client() as client:
ocr_response = await client.post("/ocr", json={
"pdfUrl": "http://minio/doc.pdf",
"scriptType": "TYPEWRITER",
"language": "de",
})
assert ocr_response.status_code == 200, ocr_response.text
metrics_response = await client.get("/metrics")
body = metrics_response.text
assert "http_requests_total" in body
assert "http_request_duration_seconds" in body
def test_build_metrics_registers_all_custom_metrics_on_given_registry():
"""`build_metrics` returns an OcrMetrics bound to the supplied registry."""
registry = CollectorRegistry()
metrics = build_metrics(registry)
metric_names = {m.name for m in registry.collect()}
expected = {
"ocr_jobs",
"ocr_pages",
"ocr_skipped_pages",
"ocr_words",
"ocr_illegible_words",
"ocr_processing_seconds",
"ocr_training_runs",
"ocr_model_accuracy",
"ocr_models_ready",
}
assert expected <= metric_names, f"missing: {expected - metric_names}"
# A second registry yields a separate container — no shared state.
other_metrics = build_metrics(CollectorRegistry())
assert metrics is not other_metrics
async def _drive_ocr(client: AsyncClient, *, script_type: str) -> None:
"""Helper — fires /ocr with a single mocked page and asserts a 200."""
response = await client.post("/ocr", json={
"pdfUrl": "http://minio/doc.pdf",
"scriptType": script_type,
"language": "de",
})
assert response.status_code == 200, response.text
@pytest.mark.asyncio
async def test_ocr_jobs_total_incremented_with_kraken_engine_label_for_kurrent(fresh_metrics):
"""A /ocr call with HANDWRITING_KURRENT increments engine=kraken."""
mock_images = [Image.new("RGB", (100, 100))]
mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
"polygon": None, "text": "hi", "words": []}]
with patch("main.correct_text", side_effect=lambda t: t), \
patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main.preprocess_page", side_effect=lambda img: img), \
patch("main.kraken_engine.is_available", return_value=True), \
patch("main.kraken_engine.extract_blocks", return_value=mock_blocks):
async with ocr_client() as client:
await _drive_ocr(client, script_type="HANDWRITING_KURRENT")
value = fresh_metrics.ocr_jobs_total.labels(
engine="kraken", script_type="HANDWRITING_KURRENT"
)._value.get()
assert value == 1.0
@pytest.mark.asyncio
async def test_ocr_jobs_total_incremented_with_surya_engine_label_for_typewriter(fresh_metrics):
"""A /ocr call with TYPEWRITER increments engine=surya."""
mock_images = [Image.new("RGB", (100, 100))]
mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
"polygon": None, "text": "hi", "words": []}]
with patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main.preprocess_page", side_effect=lambda img: img), \
patch("main.surya_engine.extract_blocks", return_value=mock_blocks):
async with ocr_client() as client:
await _drive_ocr(client, script_type="TYPEWRITER")
value = fresh_metrics.ocr_jobs_total.labels(
engine="surya", script_type="TYPEWRITER"
)._value.get()
assert value == 1.0
@pytest.mark.asyncio
async def test_ocr_pages_total_incremented_once_per_page_in_stream(fresh_metrics):
"""The /ocr/stream generator increments ocr_pages_total per successful page."""
mock_images = [Image.new("RGB", (100, 100)) for _ in range(3)]
mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
"polygon": None, "text": "hi", "words": []}]
with patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main.preprocess_page", side_effect=lambda img: img), \
patch("main.surya_engine.extract_page_blocks", return_value=mock_blocks):
async with ocr_client() as client:
async with client.stream("POST", "/ocr/stream", json={
"pdfUrl": "http://minio/doc.pdf",
"scriptType": "TYPEWRITER",
"language": "de",
}) as response:
assert response.status_code == 200
# Drain the stream so all per-page increments fire.
async for _ in response.aiter_lines():
pass
value = fresh_metrics.ocr_pages_total.labels(engine="surya")._value.get()
assert value == 3.0
@pytest.mark.asyncio
async def test_ocr_skipped_pages_total_incremented_when_engine_raises_for_a_page(fresh_metrics):
"""When the engine raises on a page, ocr_skipped_pages_total bumps and the stream finishes."""
mock_images = [Image.new("RGB", (100, 100)) for _ in range(2)]
good_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
"polygon": None, "text": "ok", "words": []}]
call_count = {"n": 0}
def extract_side_effect(*args, **kwargs):
call_count["n"] += 1
if call_count["n"] == 1:
raise RuntimeError("synthetic engine failure")
return good_blocks
with patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main.preprocess_page", side_effect=lambda img: img), \
patch("main.surya_engine.extract_page_blocks", side_effect=extract_side_effect):
async with ocr_client() as client:
async with client.stream("POST", "/ocr/stream", json={
"pdfUrl": "http://minio/doc.pdf",
"scriptType": "TYPEWRITER",
"language": "de",
}) as response:
assert response.status_code == 200
saw_error = False
async for line in response.aiter_lines():
if line and '"type": "error"' in line:
saw_error = True
assert saw_error
assert fresh_metrics.ocr_skipped_pages_total._value.get() == 1.0
# The second page still succeeds.
assert fresh_metrics.ocr_pages_total.labels(engine="surya")._value.get() == 1.0
@pytest.mark.asyncio
async def test_ocr_words_and_illegible_words_total_sum_across_blocks(fresh_metrics):
"""Counters reflect totals summed over every block in the request.
Threshold defaults to THRESHOLD_DEFAULT (0.3) for non-Kurrent scripts. Two
blocks: 3 words above + 2 words below threshold across blocks.
"""
mock_images = [Image.new("RGB", (100, 100))]
mock_blocks = [
{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
"polygon": None, "text": "ignored",
"words": [{"text": "Lieber", "confidence": 0.9},
{"text": "Freund", "confidence": 0.1}]},
{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
"polygon": None, "text": "ignored",
"words": [{"text": "Gruss", "confidence": 0.8},
{"text": "verschmiert", "confidence": 0.05},
{"text": "Karl", "confidence": 0.95}]},
]
with patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main.preprocess_page", side_effect=lambda img: img), \
patch("main.surya_engine.extract_blocks", return_value=mock_blocks):
async with ocr_client() as client:
await _drive_ocr(client, script_type="TYPEWRITER")
assert fresh_metrics.ocr_words_total._value.get() == 5.0
assert fresh_metrics.ocr_illegible_words_total._value.get() == 2.0
def _histogram_count_sum(histogram, **labels) -> tuple[float, float]:
"""Read the per-label-set _count and _sum from a prometheus_client Histogram."""
child = histogram.labels(**labels)
return child._sum.get(), sum(b.get() for b in child._buckets)
@pytest.mark.asyncio
async def test_ocr_processing_seconds_histogram_observed_per_page_in_stream(fresh_metrics):
"""The streaming generator observes ocr_processing_seconds once per page."""
mock_images = [Image.new("RGB", (100, 100)) for _ in range(2)]
mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
"polygon": None, "text": "ok", "words": []}]
with patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main.preprocess_page", side_effect=lambda img: img), \
patch("main.surya_engine.extract_page_blocks", return_value=mock_blocks):
async with ocr_client() as client:
async with client.stream("POST", "/ocr/stream", json={
"pdfUrl": "http://minio/doc.pdf",
"scriptType": "TYPEWRITER",
"language": "de",
}) as response:
assert response.status_code == 200
async for _ in response.aiter_lines():
pass
sum_seconds, count = _histogram_count_sum(
fresh_metrics.ocr_processing_seconds, engine="surya"
)
assert count == 2.0
assert sum_seconds >= 0.0
@pytest.mark.asyncio
async def test_ocr_training_runs_total_incremented_with_recognition_success_label(fresh_metrics):
"""/train success increments ocr_training_runs_total{kind=recognition, outcome=success}."""
async def fake_to_thread(func, *args, **kwargs):
return _fake_training_result()
with patch("main.TRAINING_TOKEN", "secret-token"), \
patch("main._models_ready", True), \
patch("main.asyncio.to_thread", side_effect=fake_to_thread):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post(
"/train",
files={"file": ("training.zip", _minimal_zip(), "application/zip")},
headers={"X-Training-Token": "secret-token"},
)
assert response.status_code == 200
assert fresh_metrics.ocr_training_runs_total.labels(
kind="recognition", outcome="success"
)._value.get() == 1.0
@pytest.mark.asyncio
async def test_ocr_training_runs_total_incremented_with_recognition_error_label(fresh_metrics):
"""When ketos exits non-zero, the error counter bumps and the exception propagates.
Uses the narrowest available seam — `subprocess.run` returning a failing
CompletedProcess — instead of stubbing the asyncio.to_thread boundary,
so the test exercises the real _run_training error path.
"""
from subprocess import CompletedProcess
failing_proc = CompletedProcess(
args=["ketos"], returncode=1, stdout="", stderr="synthetic ketos failure"
)
with patch("main.TRAINING_TOKEN", "secret-token"), \
patch("main._models_ready", True), \
patch("main.subprocess.run", return_value=failing_proc):
transport = ASGITransport(app=app, raise_app_exceptions=False)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.post(
"/train",
files={"file": ("training.zip", _minimal_zip(), "application/zip")},
headers={"X-Training-Token": "secret-token"},
)
assert response.status_code == 500
assert fresh_metrics.ocr_training_runs_total.labels(
kind="recognition", outcome="error"
)._value.get() == 1.0
@pytest.mark.asyncio
async def test_ocr_training_runs_total_incremented_with_segmentation_success_label(fresh_metrics):
"""/segtrain success increments ocr_training_runs_total{kind=segmentation, outcome=success}."""
async def fake_to_thread(func, *args, **kwargs):
return _fake_training_result(accuracy=0.83)
with patch("main.TRAINING_TOKEN", "secret-token"), \
patch("main._models_ready", True), \
patch("main.asyncio.to_thread", side_effect=fake_to_thread):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post(
"/segtrain",
files={"file": ("training.zip", _minimal_zip(), "application/zip")},
headers={"X-Training-Token": "secret-token"},
)
assert response.status_code == 200
assert fresh_metrics.ocr_training_runs_total.labels(
kind="segmentation", outcome="success"
)._value.get() == 1.0
@pytest.mark.asyncio
async def test_ocr_training_runs_total_incremented_with_recognition_success_label_for_train_sender(fresh_metrics):
"""/train-sender success increments ocr_training_runs_total{kind=recognition, outcome=success}."""
async def fake_to_thread(func, *args, **kwargs):
return _fake_training_result()
with patch("main.TRAINING_TOKEN", "secret-token"), \
patch("main._models_ready", True), \
patch("main.asyncio.to_thread", side_effect=fake_to_thread):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post(
"/train-sender",
files={"file": ("training.zip", _minimal_zip(), "application/zip")},
data={"output_model_path": "/app/models/sender_test.mlmodel"},
headers={"X-Training-Token": "secret-token"},
)
assert response.status_code == 200, response.text
assert fresh_metrics.ocr_training_runs_total.labels(
kind="recognition", outcome="success"
)._value.get() == 1.0
@pytest.mark.asyncio
async def test_ocr_model_accuracy_gauge_stays_default_when_training_returns_no_accuracy(fresh_metrics):
"""When the runner returns accuracy=None, ocr_model_accuracy must remain at its default 0."""
async def fake_to_thread(func, *args, **kwargs):
return {"loss": None, "accuracy": None, "cer": None, "epochs": 5}
with patch("main.TRAINING_TOKEN", "secret-token"), \
patch("main._models_ready", True), \
patch("main.asyncio.to_thread", side_effect=fake_to_thread):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post(
"/train",
files={"file": ("training.zip", _minimal_zip(), "application/zip")},
headers={"X-Training-Token": "secret-token"},
)
assert response.status_code == 200
# Gauge was never .set() — accessing the label child still creates it with default 0.0.
assert fresh_metrics.ocr_model_accuracy.labels(
kind="recognition"
)._value.get() == 0.0
@pytest.mark.asyncio
async def test_ocr_model_accuracy_gauge_set_per_kind_after_successful_training(fresh_metrics):
"""After /train and /segtrain succeed, ocr_model_accuracy{kind=...} reflects the result."""
recognition_accuracy = 0.917
segmentation_accuracy = 0.834
async def fake_recognition_to_thread(func, *args, **kwargs):
return _fake_training_result(accuracy=recognition_accuracy)
async def fake_segmentation_to_thread(func, *args, **kwargs):
return _fake_training_result(accuracy=segmentation_accuracy)
with patch("main.TRAINING_TOKEN", "secret-token"), \
patch("main._models_ready", True):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
with patch("main.asyncio.to_thread", side_effect=fake_recognition_to_thread):
rec_resp = await client.post(
"/train",
files={"file": ("training.zip", _minimal_zip(), "application/zip")},
headers={"X-Training-Token": "secret-token"},
)
assert rec_resp.status_code == 200
with patch("main.asyncio.to_thread", side_effect=fake_segmentation_to_thread):
seg_resp = await client.post(
"/segtrain",
files={"file": ("training.zip", _minimal_zip(), "application/zip")},
headers={"X-Training-Token": "secret-token"},
)
assert seg_resp.status_code == 200
assert fresh_metrics.ocr_model_accuracy.labels(kind="recognition")._value.get() == pytest.approx(recognition_accuracy)
assert fresh_metrics.ocr_model_accuracy.labels(kind="segmentation")._value.get() == pytest.approx(segmentation_accuracy)
def test_ocr_models_ready_gauge_defaults_to_zero():
"""A freshly-built OcrMetrics has ocr_models_ready=0 before lifespan runs."""
metrics = build_metrics(CollectorRegistry())
assert metrics.ocr_models_ready._value.get() == 0.0
@pytest.mark.asyncio
async def test_ocr_models_ready_gauge_is_one_after_lifespan_startup(fresh_metrics):
"""The lifespan flips ocr_models_ready to 1 once load_models / load_spell_checker return.
ASGITransport does not run lifespan by default, so the lifespan context
manager is driven directly to exercise the startup code path.
"""
assert fresh_metrics.ocr_models_ready._value.get() == 0.0
with patch("main.kraken_engine.load_models"), \
patch("main.load_spell_checker"):
async with app.router.lifespan_context(app):
assert fresh_metrics.ocr_models_ready._value.get() == 1.0
@pytest.mark.asyncio
async def test_ocr_processing_seconds_histogram_observed_per_page_in_guided_stream(fresh_metrics):
"""The guided streaming generator observes ocr_processing_seconds once per page."""
mock_images = [Image.new("RGB", (100, 100)) for _ in range(2)]
regions = [
{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 0.5, "height": 0.5, "annotationId": "a1"},
{"pageNumber": 2, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0, "annotationId": "a2"},
]
with patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main.preprocess_page", side_effect=lambda img: img), \
patch("main.surya_engine.extract_region_text", return_value="text"):
async with ocr_client() as client:
async with client.stream("POST", "/ocr/stream", json={
"pdfUrl": "http://minio/doc.pdf",
"scriptType": "TYPEWRITER",
"language": "de",
"regions": regions,
}) as response:
assert response.status_code == 200
async for _ in response.aiter_lines():
pass
sum_seconds, count = _histogram_count_sum(
fresh_metrics.ocr_processing_seconds, engine="surya"
)
assert count == 2.0
assert sum_seconds >= 0.0
@pytest.mark.asyncio
async def test_ocr_processing_seconds_histogram_excludes_spell_check_time_in_guided_stream(fresh_metrics):
"""The guided observation must time engine work only, not the spell-check pass.
Wall-clock bound rather than a structural `patch("main.time.monotonic")`:
the patched attribute is the *global* `time.monotonic`, which httpx and
asyncio also consume — they exhaust the deterministic sequence before the
request reaches the engine loop. Bound is sized against the failure mode,
not the noise floor: spell-check sleeps 0.05s × 2 regions = 0.1s, so a
timer that accidentally wrapped `correct_text` would observe >= 0.1s. The
0.09s ceiling catches that bug while leaving ~90ms of slack for slow CI
runners (engine work is instantaneous under the mock).
"""
mock_images = [Image.new("RGB", (100, 100))]
regions = [
{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 0.5, "height": 0.5, "annotationId": "a1"},
{"pageNumber": 1, "x": 0.5, "y": 0.0, "width": 0.5, "height": 0.5, "annotationId": "a2"},
]
def slow_correct(text):
import time as _time
_time.sleep(0.05)
return text
with patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main.preprocess_page", side_effect=lambda img: img), \
patch("main.kraken_engine.is_available", return_value=True), \
patch("main.kraken_engine.extract_region_text", return_value="text"), \
patch("main.correct_text", side_effect=slow_correct):
async with ocr_client() as client:
async with client.stream("POST", "/ocr/stream", json={
"pdfUrl": "http://minio/doc.pdf",
"scriptType": "HANDWRITING_KURRENT",
"language": "de",
"regions": regions,
}) as response:
assert response.status_code == 200
async for _ in response.aiter_lines():
pass
sum_seconds, _ = _histogram_count_sum(
fresh_metrics.ocr_processing_seconds, engine="kraken"
)
assert sum_seconds < 0.09, f"timing must exclude spell-check; got sum={sum_seconds}"
@pytest.mark.asyncio
async def test_ocr_jobs_total_not_incremented_when_pdf_download_fails_in_stream(fresh_metrics):
"""If `_download_and_convert_pdf` raises, ocr_jobs_total is NOT incremented.
Mirrors the /ocr endpoint's semantics: the counter only records jobs that
actually started OCR work, not failed downloads.
"""
async def fail_download(url):
raise RuntimeError("synthetic download failure")
with patch("main._download_and_convert_pdf", new=fail_download):
async with ocr_client(raise_app_exceptions=False) as client:
response = await client.post("/ocr/stream", json={
"pdfUrl": "http://minio/doc.pdf",
"scriptType": "TYPEWRITER",
"language": "de",
})
assert response.status_code == 500
assert fresh_metrics.ocr_jobs_total.labels(
engine="surya", script_type="TYPEWRITER"
)._value.get() == 0.0
def test_uvicorn_access_log_filter_fails_open_on_short_or_missing_args():
"""The filter must default-allow records when args is None or shorter than expected.
Locks in fail-open behavior: if uvicorn ever changes its format we keep
forwarding records to the handler rather than silently dropping logs.
"""
import logging as _logging
from main import MetricsPathFilter
filt = MetricsPathFilter()
none_record = _logging.LogRecord(
name="uvicorn.access", level=_logging.INFO, pathname="", lineno=0,
msg="some message", args=None, exc_info=None,
)
short_record = _logging.LogRecord(
name="uvicorn.access", level=_logging.INFO, pathname="", lineno=0,
msg="%s %s", args=("a", "b"), exc_info=None,
)
assert filt.filter(none_record) is True
assert filt.filter(short_record) is True
def test_uvicorn_access_log_filter_skips_metrics_path():
"""The MetricsPathFilter drops uvicorn.access log records that target /metrics."""
import logging as _logging
from main import MetricsPathFilter
filt = MetricsPathFilter()
metrics_record = _logging.LogRecord(
name="uvicorn.access", level=_logging.INFO, pathname="", lineno=0,
msg='%s - "%s %s HTTP/%s" %d',
args=("127.0.0.1:1234", "GET", "/metrics", "1.1", 200),
exc_info=None,
)
health_record = _logging.LogRecord(
name="uvicorn.access", level=_logging.INFO, pathname="", lineno=0,
msg='%s - "%s %s HTTP/%s" %d',
args=("127.0.0.1:1234", "GET", "/health", "1.1", 200),
exc_info=None,
)
ocr_record = _logging.LogRecord(
name="uvicorn.access", level=_logging.INFO, pathname="", lineno=0,
msg='%s - "%s %s HTTP/%s" %d',
args=("127.0.0.1:1234", "POST", "/ocr", "1.1", 200),
exc_info=None,
)
assert filt.filter(metrics_record) is False
assert filt.filter(health_record) is False
assert filt.filter(ocr_record) is True