Files
familienarchiv/ocr-service/test_metrics.py
Marcel 549cb15845 test(ocr): cover /train-sender counter and accuracy=None gauge default
Two regression tests:
- /train-sender hitting the success path bumps the recognition counter
  (previously only /train and /segtrain were covered).
- A successful run whose result.accuracy is None must not call set() on
  ocr_model_accuracy — the gauge stays at its default 0.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-21 16:53:48 +02:00

636 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Tests for Prometheus metrics exposed by the OCR service.
Each test that asserts on a counter/gauge value uses a fresh CollectorRegistry
(see decision #3 on issue #652) to keep the metrics isolated between tests.
"""
import io
import zipfile
from unittest.mock import AsyncMock, patch
import pytest
from httpx import ASGITransport, AsyncClient
from PIL import Image
from prometheus_client import CollectorRegistry
from main import app
from metrics import build_metrics
def _minimal_zip() -> bytes:
"""Return a ZIP containing one fake .xml so endpoint validation passes."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("page_01.xml", "<PcGts/>")
return buf.getvalue()
def _fake_training_result(accuracy: float = 0.91) -> dict:
return {"loss": None, "accuracy": accuracy, "cer": round(1 - accuracy, 4), "epochs": 5}
@pytest.fixture
def fresh_metrics(monkeypatch):
"""Replace the module-level `main.metrics` with one bound to a fresh registry."""
registry = CollectorRegistry()
test_metrics = build_metrics(registry)
monkeypatch.setattr("main.metrics", test_metrics)
return test_metrics
@pytest.mark.asyncio
async def test_metrics_endpoint_returns_200():
"""`GET /metrics` returns 200 with Prometheus exposition content."""
with patch("main.kraken_engine.load_models"), \
patch("main.load_spell_checker"):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.get("/metrics")
assert response.status_code == 200
assert "text/plain" in response.headers.get("content-type", "")
@pytest.mark.asyncio
async def test_metrics_includes_http_request_metrics_after_ocr_call():
"""After a request to /ocr, `/metrics` exposes auto-instrumented http_* metrics."""
mock_images = [Image.new("RGB", (100, 100))]
mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
"polygon": None, "text": "hi", "words": []}]
with patch("main.kraken_engine.load_models"), \
patch("main.load_spell_checker"), \
patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main.preprocess_page", side_effect=lambda img: img), \
patch("main.surya_engine.extract_blocks", return_value=mock_blocks):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
# Models need to be loaded for /ocr to accept requests; force the flag.
import main as main_module
main_module._models_ready = True
try:
ocr_response = await client.post("/ocr", json={
"pdfUrl": "http://minio/doc.pdf",
"scriptType": "TYPEWRITER",
"language": "de",
})
assert ocr_response.status_code == 200, ocr_response.text
metrics_response = await client.get("/metrics")
finally:
main_module._models_ready = False
body = metrics_response.text
assert "http_requests_total" in body
assert "http_request_duration_seconds" in body
def test_build_metrics_registers_all_custom_metrics_on_given_registry():
"""`build_metrics` returns an OcrMetrics bound to the supplied registry."""
registry = CollectorRegistry()
metrics = build_metrics(registry)
metric_names = {m.name for m in registry.collect()}
expected = {
"ocr_jobs",
"ocr_pages",
"ocr_skipped_pages",
"ocr_words",
"ocr_illegible_words",
"ocr_processing_seconds",
"ocr_training_runs",
"ocr_model_accuracy",
"ocr_models_ready",
}
assert expected <= metric_names, f"missing: {expected - metric_names}"
# A second registry yields a separate container — no shared state.
other_metrics = build_metrics(CollectorRegistry())
assert metrics is not other_metrics
async def _drive_ocr(client: AsyncClient, *, script_type: str) -> None:
"""Helper — fires /ocr with a single mocked page and asserts a 200."""
response = await client.post("/ocr", json={
"pdfUrl": "http://minio/doc.pdf",
"scriptType": script_type,
"language": "de",
})
assert response.status_code == 200, response.text
@pytest.mark.asyncio
async def test_ocr_jobs_total_incremented_with_kraken_engine_label_for_kurrent(fresh_metrics):
"""A /ocr call with HANDWRITING_KURRENT increments engine=kraken."""
mock_images = [Image.new("RGB", (100, 100))]
mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
"polygon": None, "text": "hi", "words": []}]
with patch("main.kraken_engine.load_models"), \
patch("main.load_spell_checker"), \
patch("main.correct_text", side_effect=lambda t: t), \
patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main.preprocess_page", side_effect=lambda img: img), \
patch("main.kraken_engine.is_available", return_value=True), \
patch("main.kraken_engine.extract_blocks", return_value=mock_blocks):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
import main as main_module
main_module._models_ready = True
try:
await _drive_ocr(client, script_type="HANDWRITING_KURRENT")
finally:
main_module._models_ready = False
value = fresh_metrics.ocr_jobs_total.labels(
engine="kraken", script_type="HANDWRITING_KURRENT"
)._value.get()
assert value == 1.0
@pytest.mark.asyncio
async def test_ocr_jobs_total_incremented_with_surya_engine_label_for_typewriter(fresh_metrics):
"""A /ocr call with TYPEWRITER increments engine=surya."""
mock_images = [Image.new("RGB", (100, 100))]
mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
"polygon": None, "text": "hi", "words": []}]
with patch("main.kraken_engine.load_models"), \
patch("main.load_spell_checker"), \
patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main.preprocess_page", side_effect=lambda img: img), \
patch("main.surya_engine.extract_blocks", return_value=mock_blocks):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
import main as main_module
main_module._models_ready = True
try:
await _drive_ocr(client, script_type="TYPEWRITER")
finally:
main_module._models_ready = False
value = fresh_metrics.ocr_jobs_total.labels(
engine="surya", script_type="TYPEWRITER"
)._value.get()
assert value == 1.0
@pytest.mark.asyncio
async def test_ocr_pages_total_incremented_once_per_page_in_stream(fresh_metrics):
"""The /ocr/stream generator increments ocr_pages_total per successful page."""
mock_images = [Image.new("RGB", (100, 100)) for _ in range(3)]
mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
"polygon": None, "text": "hi", "words": []}]
with patch("main.kraken_engine.load_models"), \
patch("main.load_spell_checker"), \
patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main.preprocess_page", side_effect=lambda img: img), \
patch("main.surya_engine.extract_page_blocks", return_value=mock_blocks):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
import main as main_module
main_module._models_ready = True
try:
async with client.stream("POST", "/ocr/stream", json={
"pdfUrl": "http://minio/doc.pdf",
"scriptType": "TYPEWRITER",
"language": "de",
}) as response:
assert response.status_code == 200
# Drain the stream so all per-page increments fire.
async for _ in response.aiter_lines():
pass
finally:
main_module._models_ready = False
value = fresh_metrics.ocr_pages_total.labels(engine="surya")._value.get()
assert value == 3.0
@pytest.mark.asyncio
async def test_ocr_skipped_pages_total_incremented_when_engine_raises_for_a_page(fresh_metrics):
"""When the engine raises on a page, ocr_skipped_pages_total bumps and the stream finishes."""
mock_images = [Image.new("RGB", (100, 100)) for _ in range(2)]
good_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
"polygon": None, "text": "ok", "words": []}]
call_count = {"n": 0}
def extract_side_effect(*args, **kwargs):
call_count["n"] += 1
if call_count["n"] == 1:
raise RuntimeError("synthetic engine failure")
return good_blocks
with patch("main.kraken_engine.load_models"), \
patch("main.load_spell_checker"), \
patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main.preprocess_page", side_effect=lambda img: img), \
patch("main.surya_engine.extract_page_blocks", side_effect=extract_side_effect):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
import main as main_module
main_module._models_ready = True
try:
async with client.stream("POST", "/ocr/stream", json={
"pdfUrl": "http://minio/doc.pdf",
"scriptType": "TYPEWRITER",
"language": "de",
}) as response:
assert response.status_code == 200
saw_error = False
async for line in response.aiter_lines():
if line and '"type": "error"' in line:
saw_error = True
assert saw_error
finally:
main_module._models_ready = False
assert fresh_metrics.ocr_skipped_pages_total._value.get() == 1.0
# The second page still succeeds.
assert fresh_metrics.ocr_pages_total.labels(engine="surya")._value.get() == 1.0
@pytest.mark.asyncio
async def test_ocr_words_and_illegible_words_total_sum_across_blocks(fresh_metrics):
"""Counters reflect totals summed over every block in the request.
Threshold defaults to THRESHOLD_DEFAULT (0.3) for non-Kurrent scripts. Two
blocks: 3 words above + 2 words below threshold across blocks.
"""
mock_images = [Image.new("RGB", (100, 100))]
mock_blocks = [
{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
"polygon": None, "text": "ignored",
"words": [{"text": "Lieber", "confidence": 0.9},
{"text": "Freund", "confidence": 0.1}]},
{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
"polygon": None, "text": "ignored",
"words": [{"text": "Gruss", "confidence": 0.8},
{"text": "verschmiert", "confidence": 0.05},
{"text": "Karl", "confidence": 0.95}]},
]
with patch("main.kraken_engine.load_models"), \
patch("main.load_spell_checker"), \
patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main.preprocess_page", side_effect=lambda img: img), \
patch("main.surya_engine.extract_blocks", return_value=mock_blocks):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
import main as main_module
main_module._models_ready = True
try:
await _drive_ocr(client, script_type="TYPEWRITER")
finally:
main_module._models_ready = False
assert fresh_metrics.ocr_words_total._value.get() == 5.0
assert fresh_metrics.ocr_illegible_words_total._value.get() == 2.0
def _histogram_count_sum(histogram, **labels) -> tuple[float, float]:
"""Read the per-label-set _count and _sum from a prometheus_client Histogram."""
child = histogram.labels(**labels)
return child._sum.get(), sum(b.get() for b in child._buckets)
@pytest.mark.asyncio
async def test_ocr_processing_seconds_histogram_observed_per_page_in_stream(fresh_metrics):
"""The streaming generator observes ocr_processing_seconds once per page."""
mock_images = [Image.new("RGB", (100, 100)) for _ in range(2)]
mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
"polygon": None, "text": "ok", "words": []}]
with patch("main.kraken_engine.load_models"), \
patch("main.load_spell_checker"), \
patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main.preprocess_page", side_effect=lambda img: img), \
patch("main.surya_engine.extract_page_blocks", return_value=mock_blocks):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
import main as main_module
main_module._models_ready = True
try:
async with client.stream("POST", "/ocr/stream", json={
"pdfUrl": "http://minio/doc.pdf",
"scriptType": "TYPEWRITER",
"language": "de",
}) as response:
assert response.status_code == 200
async for _ in response.aiter_lines():
pass
finally:
main_module._models_ready = False
sum_seconds, count = _histogram_count_sum(
fresh_metrics.ocr_processing_seconds, engine="surya"
)
assert count == 2.0
assert sum_seconds >= 0.0
@pytest.mark.asyncio
async def test_ocr_training_runs_total_incremented_with_recognition_success_label(fresh_metrics):
"""/train success increments ocr_training_runs_total{kind=recognition, outcome=success}."""
async def fake_to_thread(func, *args, **kwargs):
return _fake_training_result()
with patch("main.TRAINING_TOKEN", "secret-token"), \
patch("main._models_ready", True), \
patch("main.asyncio.to_thread", side_effect=fake_to_thread):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post(
"/train",
files={"file": ("training.zip", _minimal_zip(), "application/zip")},
headers={"X-Training-Token": "secret-token"},
)
assert response.status_code == 200
assert fresh_metrics.ocr_training_runs_total.labels(
kind="recognition", outcome="success"
)._value.get() == 1.0
@pytest.mark.asyncio
async def test_ocr_training_runs_total_incremented_with_recognition_error_label(fresh_metrics):
"""When /train's inner runner raises, the error counter bumps and the exception propagates."""
async def fake_to_thread(func, *args, **kwargs):
raise RuntimeError("ketos train failed (exit 1): synthetic")
with patch("main.TRAINING_TOKEN", "secret-token"), \
patch("main._models_ready", True), \
patch("main.asyncio.to_thread", side_effect=fake_to_thread):
transport = ASGITransport(app=app, raise_app_exceptions=False)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.post(
"/train",
files={"file": ("training.zip", _minimal_zip(), "application/zip")},
headers={"X-Training-Token": "secret-token"},
)
assert response.status_code == 500
assert fresh_metrics.ocr_training_runs_total.labels(
kind="recognition", outcome="error"
)._value.get() == 1.0
@pytest.mark.asyncio
async def test_ocr_training_runs_total_incremented_with_segmentation_success_label(fresh_metrics):
"""/segtrain success increments ocr_training_runs_total{kind=segmentation, outcome=success}."""
async def fake_to_thread(func, *args, **kwargs):
return _fake_training_result(accuracy=0.83)
with patch("main.TRAINING_TOKEN", "secret-token"), \
patch("main._models_ready", True), \
patch("main.asyncio.to_thread", side_effect=fake_to_thread):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post(
"/segtrain",
files={"file": ("training.zip", _minimal_zip(), "application/zip")},
headers={"X-Training-Token": "secret-token"},
)
assert response.status_code == 200
assert fresh_metrics.ocr_training_runs_total.labels(
kind="segmentation", outcome="success"
)._value.get() == 1.0
@pytest.mark.asyncio
async def test_ocr_training_runs_total_incremented_with_recognition_success_label_for_train_sender(fresh_metrics):
"""/train-sender success increments ocr_training_runs_total{kind=recognition, outcome=success}."""
async def fake_to_thread(func, *args, **kwargs):
return _fake_training_result()
with patch("main.TRAINING_TOKEN", "secret-token"), \
patch("main._models_ready", True), \
patch("main.asyncio.to_thread", side_effect=fake_to_thread):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post(
"/train-sender",
files={"file": ("training.zip", _minimal_zip(), "application/zip")},
data={"output_model_path": "/app/models/sender_test.mlmodel"},
headers={"X-Training-Token": "secret-token"},
)
assert response.status_code == 200, response.text
assert fresh_metrics.ocr_training_runs_total.labels(
kind="recognition", outcome="success"
)._value.get() == 1.0
@pytest.mark.asyncio
async def test_ocr_model_accuracy_gauge_stays_default_when_training_returns_no_accuracy(fresh_metrics):
"""When the runner returns accuracy=None, ocr_model_accuracy must remain at its default 0."""
async def fake_to_thread(func, *args, **kwargs):
return {"loss": None, "accuracy": None, "cer": None, "epochs": 5}
with patch("main.TRAINING_TOKEN", "secret-token"), \
patch("main._models_ready", True), \
patch("main.asyncio.to_thread", side_effect=fake_to_thread):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post(
"/train",
files={"file": ("training.zip", _minimal_zip(), "application/zip")},
headers={"X-Training-Token": "secret-token"},
)
assert response.status_code == 200
# Gauge was never .set() — accessing the label child still creates it with default 0.0.
assert fresh_metrics.ocr_model_accuracy.labels(
kind="recognition"
)._value.get() == 0.0
@pytest.mark.asyncio
async def test_ocr_model_accuracy_gauge_set_per_kind_after_successful_training(fresh_metrics):
"""After /train and /segtrain succeed, ocr_model_accuracy{kind=...} reflects the result."""
recognition_accuracy = 0.917
segmentation_accuracy = 0.834
async def fake_recognition_to_thread(func, *args, **kwargs):
return _fake_training_result(accuracy=recognition_accuracy)
async def fake_segmentation_to_thread(func, *args, **kwargs):
return _fake_training_result(accuracy=segmentation_accuracy)
with patch("main.TRAINING_TOKEN", "secret-token"), \
patch("main._models_ready", True):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
with patch("main.asyncio.to_thread", side_effect=fake_recognition_to_thread):
rec_resp = await client.post(
"/train",
files={"file": ("training.zip", _minimal_zip(), "application/zip")},
headers={"X-Training-Token": "secret-token"},
)
assert rec_resp.status_code == 200
with patch("main.asyncio.to_thread", side_effect=fake_segmentation_to_thread):
seg_resp = await client.post(
"/segtrain",
files={"file": ("training.zip", _minimal_zip(), "application/zip")},
headers={"X-Training-Token": "secret-token"},
)
assert seg_resp.status_code == 200
assert fresh_metrics.ocr_model_accuracy.labels(kind="recognition")._value.get() == pytest.approx(recognition_accuracy)
assert fresh_metrics.ocr_model_accuracy.labels(kind="segmentation")._value.get() == pytest.approx(segmentation_accuracy)
def test_ocr_models_ready_gauge_defaults_to_zero():
"""A freshly-built OcrMetrics has ocr_models_ready=0 before lifespan runs."""
metrics = build_metrics(CollectorRegistry())
assert metrics.ocr_models_ready._value.get() == 0.0
@pytest.mark.asyncio
async def test_ocr_models_ready_gauge_is_one_after_lifespan_startup(fresh_metrics):
"""The lifespan flips ocr_models_ready to 1 once load_models / load_spell_checker return.
ASGITransport does not run lifespan by default, so the lifespan context
manager is driven directly to exercise the startup code path.
"""
assert fresh_metrics.ocr_models_ready._value.get() == 0.0
with patch("main.kraken_engine.load_models"), \
patch("main.load_spell_checker"):
async with app.router.lifespan_context(app):
assert fresh_metrics.ocr_models_ready._value.get() == 1.0
@pytest.mark.asyncio
async def test_ocr_processing_seconds_histogram_observed_per_page_in_guided_stream(fresh_metrics):
"""The guided streaming generator observes ocr_processing_seconds once per page."""
mock_images = [Image.new("RGB", (100, 100)) for _ in range(2)]
regions = [
{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 0.5, "height": 0.5, "annotationId": "a1"},
{"pageNumber": 2, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0, "annotationId": "a2"},
]
with patch("main.kraken_engine.load_models"), \
patch("main.load_spell_checker"), \
patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main.preprocess_page", side_effect=lambda img: img), \
patch("main.surya_engine.extract_region_text", return_value="text"):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
import main as main_module
main_module._models_ready = True
try:
async with client.stream("POST", "/ocr/stream", json={
"pdfUrl": "http://minio/doc.pdf",
"scriptType": "TYPEWRITER",
"language": "de",
"regions": regions,
}) as response:
assert response.status_code == 200
async for _ in response.aiter_lines():
pass
finally:
main_module._models_ready = False
sum_seconds, count = _histogram_count_sum(
fresh_metrics.ocr_processing_seconds, engine="surya"
)
assert count == 2.0
assert sum_seconds >= 0.0
@pytest.mark.asyncio
async def test_ocr_processing_seconds_histogram_excludes_spell_check_time_in_guided_stream(fresh_metrics):
"""The guided observation must time engine work only, not the spell-check pass."""
mock_images = [Image.new("RGB", (100, 100))]
regions = [
{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 0.5, "height": 0.5, "annotationId": "a1"},
{"pageNumber": 1, "x": 0.5, "y": 0.0, "width": 0.5, "height": 0.5, "annotationId": "a2"},
]
def slow_correct(text):
import time as _time
_time.sleep(0.05)
return text
with patch("main.kraken_engine.load_models"), \
patch("main.load_spell_checker"), \
patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main.preprocess_page", side_effect=lambda img: img), \
patch("main.kraken_engine.is_available", return_value=True), \
patch("main.kraken_engine.extract_region_text", return_value="text"), \
patch("main.correct_text", side_effect=slow_correct):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
import main as main_module
main_module._models_ready = True
try:
async with client.stream("POST", "/ocr/stream", json={
"pdfUrl": "http://minio/doc.pdf",
"scriptType": "HANDWRITING_KURRENT",
"language": "de",
"regions": regions,
}) as response:
assert response.status_code == 200
async for _ in response.aiter_lines():
pass
finally:
main_module._models_ready = False
sum_seconds, _ = _histogram_count_sum(
fresh_metrics.ocr_processing_seconds, engine="kraken"
)
# Spell-check sleeps 0.05s per region × 2 regions = 0.1s; engine work is instantaneous.
# If timing included spell-check, sum_seconds would be >= 0.1s. Allow 30ms slack
# for scheduler overhead.
assert sum_seconds < 0.05, f"timing must exclude spell-check; got sum={sum_seconds}"
@pytest.mark.asyncio
async def test_ocr_jobs_total_not_incremented_when_pdf_download_fails_in_stream(fresh_metrics):
"""If `_download_and_convert_pdf` raises, ocr_jobs_total is NOT incremented.
Mirrors the /ocr endpoint's semantics: the counter only records jobs that
actually started OCR work, not failed downloads.
"""
async def fail_download(url):
raise RuntimeError("synthetic download failure")
with patch("main.kraken_engine.load_models"), \
patch("main.load_spell_checker"), \
patch("main._download_and_convert_pdf", new=fail_download):
transport = ASGITransport(app=app, raise_app_exceptions=False)
async with AsyncClient(transport=transport, base_url="http://test") as client:
import main as main_module
main_module._models_ready = True
try:
response = await client.post("/ocr/stream", json={
"pdfUrl": "http://minio/doc.pdf",
"scriptType": "TYPEWRITER",
"language": "de",
})
finally:
main_module._models_ready = False
assert response.status_code == 500
assert fresh_metrics.ocr_jobs_total.labels(
engine="surya", script_type="TYPEWRITER"
)._value.get() == 0.0
def test_uvicorn_access_log_filter_skips_metrics_path():
"""The MetricsPathFilter drops uvicorn.access log records that target /metrics."""
import logging as _logging
from main import MetricsPathFilter
filt = MetricsPathFilter()
metrics_record = _logging.LogRecord(
name="uvicorn.access", level=_logging.INFO, pathname="", lineno=0,
msg='%s - "%s %s HTTP/%s" %d',
args=("127.0.0.1:1234", "GET", "/metrics", "1.1", 200),
exc_info=None,
)
health_record = _logging.LogRecord(
name="uvicorn.access", level=_logging.INFO, pathname="", lineno=0,
msg='%s - "%s %s HTTP/%s" %d',
args=("127.0.0.1:1234", "GET", "/health", "1.1", 200),
exc_info=None,
)
ocr_record = _logging.LogRecord(
name="uvicorn.access", level=_logging.INFO, pathname="", lineno=0,
msg='%s - "%s %s HTTP/%s" %d',
args=("127.0.0.1:1234", "POST", "/ocr", "1.1", 200),
exc_info=None,
)
assert filt.filter(metrics_record) is False
assert filt.filter(health_record) is False
assert filt.filter(ocr_record) is True