"""Tests for Prometheus metrics exposed by the OCR service. Each test that asserts on a counter/gauge value uses a fresh CollectorRegistry (see decision #3 on issue #652) to keep the metrics isolated between tests. """ from unittest.mock import AsyncMock, patch import pytest from httpx import ASGITransport, AsyncClient from PIL import Image from prometheus_client import CollectorRegistry from main import app from metrics import build_metrics @pytest.fixture def fresh_metrics(monkeypatch): """Replace the module-level `main.metrics` with one bound to a fresh registry.""" registry = CollectorRegistry() test_metrics = build_metrics(registry) monkeypatch.setattr("main.metrics", test_metrics) return test_metrics @pytest.mark.asyncio async def test_metrics_endpoint_returns_200(): """`GET /metrics` returns 200 with Prometheus exposition content.""" with patch("main.kraken_engine.load_models"), \ patch("main.load_spell_checker"): async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: response = await client.get("/metrics") assert response.status_code == 200 assert "text/plain" in response.headers.get("content-type", "") @pytest.mark.asyncio async def test_metrics_includes_http_request_metrics_after_ocr_call(): """After a request to /ocr, `/metrics` exposes auto-instrumented http_* metrics.""" mock_images = [Image.new("RGB", (100, 100))] mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0, "polygon": None, "text": "hi", "words": []}] with patch("main.kraken_engine.load_models"), \ patch("main.load_spell_checker"), \ patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \ patch("main.preprocess_page", side_effect=lambda img: img), \ patch("main.surya_engine.extract_blocks", return_value=mock_blocks): async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: # Models need to be loaded for /ocr to accept requests; force the flag. import main as main_module main_module._models_ready = True try: ocr_response = await client.post("/ocr", json={ "pdfUrl": "http://minio/doc.pdf", "scriptType": "TYPEWRITER", "language": "de", }) assert ocr_response.status_code == 200, ocr_response.text metrics_response = await client.get("/metrics") finally: main_module._models_ready = False body = metrics_response.text assert "http_requests_total" in body assert "http_request_duration_seconds" in body def test_build_metrics_registers_all_custom_metrics_on_given_registry(): """`build_metrics` returns an OcrMetrics bound to the supplied registry.""" registry = CollectorRegistry() metrics = build_metrics(registry) metric_names = {m.name for m in registry.collect()} expected = { "ocr_jobs", "ocr_pages", "ocr_skipped_pages", "ocr_words", "ocr_illegible_words", "ocr_processing_seconds", "ocr_training_runs", "ocr_model_accuracy", "ocr_models_ready", } assert expected <= metric_names, f"missing: {expected - metric_names}" # A second registry yields a separate container — no shared state. other_metrics = build_metrics(CollectorRegistry()) assert metrics is not other_metrics async def _drive_ocr(client: AsyncClient, *, script_type: str) -> None: """Helper — fires /ocr with a single mocked page and asserts a 200.""" response = await client.post("/ocr", json={ "pdfUrl": "http://minio/doc.pdf", "scriptType": script_type, "language": "de", }) assert response.status_code == 200, response.text @pytest.mark.asyncio async def test_ocr_jobs_total_incremented_with_kraken_engine_label_for_kurrent(fresh_metrics): """A /ocr call with HANDWRITING_KURRENT increments engine=kraken.""" mock_images = [Image.new("RGB", (100, 100))] mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0, "polygon": None, "text": "hi", "words": []}] with patch("main.kraken_engine.load_models"), \ patch("main.load_spell_checker"), \ patch("main.correct_text", side_effect=lambda t: t), \ patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \ patch("main.preprocess_page", side_effect=lambda img: img), \ patch("main.kraken_engine.is_available", return_value=True), \ patch("main.kraken_engine.extract_blocks", return_value=mock_blocks): async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: import main as main_module main_module._models_ready = True try: await _drive_ocr(client, script_type="HANDWRITING_KURRENT") finally: main_module._models_ready = False value = fresh_metrics.ocr_jobs_total.labels( engine="kraken", script_type="HANDWRITING_KURRENT" )._value.get() assert value == 1.0 @pytest.mark.asyncio async def test_ocr_jobs_total_incremented_with_surya_engine_label_for_typewriter(fresh_metrics): """A /ocr call with TYPEWRITER increments engine=surya.""" mock_images = [Image.new("RGB", (100, 100))] mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0, "polygon": None, "text": "hi", "words": []}] with patch("main.kraken_engine.load_models"), \ patch("main.load_spell_checker"), \ patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \ patch("main.preprocess_page", side_effect=lambda img: img), \ patch("main.surya_engine.extract_blocks", return_value=mock_blocks): async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: import main as main_module main_module._models_ready = True try: await _drive_ocr(client, script_type="TYPEWRITER") finally: main_module._models_ready = False value = fresh_metrics.ocr_jobs_total.labels( engine="surya", script_type="TYPEWRITER" )._value.get() assert value == 1.0 @pytest.mark.asyncio async def test_ocr_pages_total_incremented_once_per_page_in_stream(fresh_metrics): """The /ocr/stream generator increments ocr_pages_total per successful page.""" mock_images = [Image.new("RGB", (100, 100)) for _ in range(3)] mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0, "polygon": None, "text": "hi", "words": []}] with patch("main.kraken_engine.load_models"), \ patch("main.load_spell_checker"), \ patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \ patch("main.preprocess_page", side_effect=lambda img: img), \ patch("main.surya_engine.extract_page_blocks", return_value=mock_blocks): async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: import main as main_module main_module._models_ready = True try: async with client.stream("POST", "/ocr/stream", json={ "pdfUrl": "http://minio/doc.pdf", "scriptType": "TYPEWRITER", "language": "de", }) as response: assert response.status_code == 200 # Drain the stream so all per-page increments fire. async for _ in response.aiter_lines(): pass finally: main_module._models_ready = False value = fresh_metrics.ocr_pages_total.labels(engine="surya")._value.get() assert value == 3.0