"""Tests for Prometheus metrics exposed by the OCR service. Each test that asserts on a counter/gauge value uses a fresh CollectorRegistry (see decision #3 on issue #652) to keep the metrics isolated between tests. """ import io import zipfile from unittest.mock import AsyncMock, patch import pytest from httpx import ASGITransport, AsyncClient from PIL import Image from prometheus_client import CollectorRegistry from main import app from metrics import build_metrics def _minimal_zip() -> bytes: """Return a ZIP containing one fake .xml so endpoint validation passes.""" buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as zf: zf.writestr("page_01.xml", "") return buf.getvalue() def _fake_training_result(accuracy: float = 0.91) -> dict: return {"loss": None, "accuracy": accuracy, "cer": round(1 - accuracy, 4), "epochs": 5} @pytest.fixture def fresh_metrics(monkeypatch): """Replace the module-level `main.metrics` with one bound to a fresh registry.""" registry = CollectorRegistry() test_metrics = build_metrics(registry) monkeypatch.setattr("main.metrics", test_metrics) return test_metrics @pytest.mark.asyncio async def test_metrics_endpoint_returns_200(): """`GET /metrics` returns 200 with Prometheus exposition content.""" with patch("main.kraken_engine.load_models"), \ patch("main.load_spell_checker"): async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: response = await client.get("/metrics") assert response.status_code == 200 assert "text/plain" in response.headers.get("content-type", "") @pytest.mark.asyncio async def test_metrics_includes_http_request_metrics_after_ocr_call(): """After a request to /ocr, `/metrics` exposes auto-instrumented http_* metrics.""" mock_images = [Image.new("RGB", (100, 100))] mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0, "polygon": None, "text": "hi", "words": []}] with patch("main.kraken_engine.load_models"), \ patch("main.load_spell_checker"), \ patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \ patch("main.preprocess_page", side_effect=lambda img: img), \ patch("main.surya_engine.extract_blocks", return_value=mock_blocks): async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: # Models need to be loaded for /ocr to accept requests; force the flag. import main as main_module main_module._models_ready = True try: ocr_response = await client.post("/ocr", json={ "pdfUrl": "http://minio/doc.pdf", "scriptType": "TYPEWRITER", "language": "de", }) assert ocr_response.status_code == 200, ocr_response.text metrics_response = await client.get("/metrics") finally: main_module._models_ready = False body = metrics_response.text assert "http_requests_total" in body assert "http_request_duration_seconds" in body def test_build_metrics_registers_all_custom_metrics_on_given_registry(): """`build_metrics` returns an OcrMetrics bound to the supplied registry.""" registry = CollectorRegistry() metrics = build_metrics(registry) metric_names = {m.name for m in registry.collect()} expected = { "ocr_jobs", "ocr_pages", "ocr_skipped_pages", "ocr_words", "ocr_illegible_words", "ocr_processing_seconds", "ocr_training_runs", "ocr_model_accuracy", "ocr_models_ready", } assert expected <= metric_names, f"missing: {expected - metric_names}" # A second registry yields a separate container — no shared state. other_metrics = build_metrics(CollectorRegistry()) assert metrics is not other_metrics async def _drive_ocr(client: AsyncClient, *, script_type: str) -> None: """Helper — fires /ocr with a single mocked page and asserts a 200.""" response = await client.post("/ocr", json={ "pdfUrl": "http://minio/doc.pdf", "scriptType": script_type, "language": "de", }) assert response.status_code == 200, response.text @pytest.mark.asyncio async def test_ocr_jobs_total_incremented_with_kraken_engine_label_for_kurrent(fresh_metrics): """A /ocr call with HANDWRITING_KURRENT increments engine=kraken.""" mock_images = [Image.new("RGB", (100, 100))] mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0, "polygon": None, "text": "hi", "words": []}] with patch("main.kraken_engine.load_models"), \ patch("main.load_spell_checker"), \ patch("main.correct_text", side_effect=lambda t: t), \ patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \ patch("main.preprocess_page", side_effect=lambda img: img), \ patch("main.kraken_engine.is_available", return_value=True), \ patch("main.kraken_engine.extract_blocks", return_value=mock_blocks): async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: import main as main_module main_module._models_ready = True try: await _drive_ocr(client, script_type="HANDWRITING_KURRENT") finally: main_module._models_ready = False value = fresh_metrics.ocr_jobs_total.labels( engine="kraken", script_type="HANDWRITING_KURRENT" )._value.get() assert value == 1.0 @pytest.mark.asyncio async def test_ocr_jobs_total_incremented_with_surya_engine_label_for_typewriter(fresh_metrics): """A /ocr call with TYPEWRITER increments engine=surya.""" mock_images = [Image.new("RGB", (100, 100))] mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0, "polygon": None, "text": "hi", "words": []}] with patch("main.kraken_engine.load_models"), \ patch("main.load_spell_checker"), \ patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \ patch("main.preprocess_page", side_effect=lambda img: img), \ patch("main.surya_engine.extract_blocks", return_value=mock_blocks): async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: import main as main_module main_module._models_ready = True try: await _drive_ocr(client, script_type="TYPEWRITER") finally: main_module._models_ready = False value = fresh_metrics.ocr_jobs_total.labels( engine="surya", script_type="TYPEWRITER" )._value.get() assert value == 1.0 @pytest.mark.asyncio async def test_ocr_pages_total_incremented_once_per_page_in_stream(fresh_metrics): """The /ocr/stream generator increments ocr_pages_total per successful page.""" mock_images = [Image.new("RGB", (100, 100)) for _ in range(3)] mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0, "polygon": None, "text": "hi", "words": []}] with patch("main.kraken_engine.load_models"), \ patch("main.load_spell_checker"), \ patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \ patch("main.preprocess_page", side_effect=lambda img: img), \ patch("main.surya_engine.extract_page_blocks", return_value=mock_blocks): async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: import main as main_module main_module._models_ready = True try: async with client.stream("POST", "/ocr/stream", json={ "pdfUrl": "http://minio/doc.pdf", "scriptType": "TYPEWRITER", "language": "de", }) as response: assert response.status_code == 200 # Drain the stream so all per-page increments fire. async for _ in response.aiter_lines(): pass finally: main_module._models_ready = False value = fresh_metrics.ocr_pages_total.labels(engine="surya")._value.get() assert value == 3.0 @pytest.mark.asyncio async def test_ocr_skipped_pages_total_incremented_when_engine_raises_for_a_page(fresh_metrics): """When the engine raises on a page, ocr_skipped_pages_total bumps and the stream finishes.""" mock_images = [Image.new("RGB", (100, 100)) for _ in range(2)] good_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0, "polygon": None, "text": "ok", "words": []}] call_count = {"n": 0} def extract_side_effect(*args, **kwargs): call_count["n"] += 1 if call_count["n"] == 1: raise RuntimeError("synthetic engine failure") return good_blocks with patch("main.kraken_engine.load_models"), \ patch("main.load_spell_checker"), \ patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \ patch("main.preprocess_page", side_effect=lambda img: img), \ patch("main.surya_engine.extract_page_blocks", side_effect=extract_side_effect): async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: import main as main_module main_module._models_ready = True try: async with client.stream("POST", "/ocr/stream", json={ "pdfUrl": "http://minio/doc.pdf", "scriptType": "TYPEWRITER", "language": "de", }) as response: assert response.status_code == 200 saw_error = False async for line in response.aiter_lines(): if line and '"type": "error"' in line: saw_error = True assert saw_error finally: main_module._models_ready = False assert fresh_metrics.ocr_skipped_pages_total._value.get() == 1.0 # The second page still succeeds. assert fresh_metrics.ocr_pages_total.labels(engine="surya")._value.get() == 1.0 @pytest.mark.asyncio async def test_ocr_words_and_illegible_words_total_sum_across_blocks(fresh_metrics): """Counters reflect totals summed over every block in the request. Threshold defaults to THRESHOLD_DEFAULT (0.3) for non-Kurrent scripts. Two blocks: 3 words above + 2 words below threshold across blocks. """ mock_images = [Image.new("RGB", (100, 100))] mock_blocks = [ {"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0, "polygon": None, "text": "ignored", "words": [{"text": "Lieber", "confidence": 0.9}, {"text": "Freund", "confidence": 0.1}]}, {"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0, "polygon": None, "text": "ignored", "words": [{"text": "Gruss", "confidence": 0.8}, {"text": "verschmiert", "confidence": 0.05}, {"text": "Karl", "confidence": 0.95}]}, ] with patch("main.kraken_engine.load_models"), \ patch("main.load_spell_checker"), \ patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \ patch("main.preprocess_page", side_effect=lambda img: img), \ patch("main.surya_engine.extract_blocks", return_value=mock_blocks): async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: import main as main_module main_module._models_ready = True try: await _drive_ocr(client, script_type="TYPEWRITER") finally: main_module._models_ready = False assert fresh_metrics.ocr_words_total._value.get() == 5.0 assert fresh_metrics.ocr_illegible_words_total._value.get() == 2.0 def _histogram_count_sum(histogram, **labels) -> tuple[float, float]: """Read the per-label-set _count and _sum from a prometheus_client Histogram.""" child = histogram.labels(**labels) return child._sum.get(), sum(b.get() for b in child._buckets) @pytest.mark.asyncio async def test_ocr_processing_seconds_histogram_observed_per_page_in_stream(fresh_metrics): """The streaming generator observes ocr_processing_seconds once per page.""" mock_images = [Image.new("RGB", (100, 100)) for _ in range(2)] mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0, "polygon": None, "text": "ok", "words": []}] with patch("main.kraken_engine.load_models"), \ patch("main.load_spell_checker"), \ patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \ patch("main.preprocess_page", side_effect=lambda img: img), \ patch("main.surya_engine.extract_page_blocks", return_value=mock_blocks): async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: import main as main_module main_module._models_ready = True try: async with client.stream("POST", "/ocr/stream", json={ "pdfUrl": "http://minio/doc.pdf", "scriptType": "TYPEWRITER", "language": "de", }) as response: assert response.status_code == 200 async for _ in response.aiter_lines(): pass finally: main_module._models_ready = False sum_seconds, count = _histogram_count_sum( fresh_metrics.ocr_processing_seconds, engine="surya" ) assert count == 2.0 assert sum_seconds >= 0.0 @pytest.mark.asyncio async def test_ocr_training_runs_total_incremented_with_recognition_success_label(fresh_metrics): """/train success increments ocr_training_runs_total{kind=recognition, outcome=success}.""" async def fake_to_thread(func, *args, **kwargs): return _fake_training_result() with patch("main.TRAINING_TOKEN", "secret-token"), \ patch("main._models_ready", True), \ patch("main.asyncio.to_thread", side_effect=fake_to_thread): async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: response = await client.post( "/train", files={"file": ("training.zip", _minimal_zip(), "application/zip")}, headers={"X-Training-Token": "secret-token"}, ) assert response.status_code == 200 assert fresh_metrics.ocr_training_runs_total.labels( kind="recognition", outcome="success" )._value.get() == 1.0 @pytest.mark.asyncio async def test_ocr_training_runs_total_incremented_with_recognition_error_label(fresh_metrics): """When /train's inner runner raises, the error counter bumps and the exception propagates.""" async def fake_to_thread(func, *args, **kwargs): raise RuntimeError("ketos train failed (exit 1): synthetic") with patch("main.TRAINING_TOKEN", "secret-token"), \ patch("main._models_ready", True), \ patch("main.asyncio.to_thread", side_effect=fake_to_thread): transport = ASGITransport(app=app, raise_app_exceptions=False) async with AsyncClient(transport=transport, base_url="http://test") as client: response = await client.post( "/train", files={"file": ("training.zip", _minimal_zip(), "application/zip")}, headers={"X-Training-Token": "secret-token"}, ) assert response.status_code == 500 assert fresh_metrics.ocr_training_runs_total.labels( kind="recognition", outcome="error" )._value.get() == 1.0 @pytest.mark.asyncio async def test_ocr_training_runs_total_incremented_with_segmentation_success_label(fresh_metrics): """/segtrain success increments ocr_training_runs_total{kind=segmentation, outcome=success}.""" async def fake_to_thread(func, *args, **kwargs): return _fake_training_result(accuracy=0.83) with patch("main.TRAINING_TOKEN", "secret-token"), \ patch("main._models_ready", True), \ patch("main.asyncio.to_thread", side_effect=fake_to_thread): async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: response = await client.post( "/segtrain", files={"file": ("training.zip", _minimal_zip(), "application/zip")}, headers={"X-Training-Token": "secret-token"}, ) assert response.status_code == 200 assert fresh_metrics.ocr_training_runs_total.labels( kind="segmentation", outcome="success" )._value.get() == 1.0 @pytest.mark.asyncio async def test_ocr_training_runs_total_incremented_with_recognition_success_label_for_train_sender(fresh_metrics): """/train-sender success increments ocr_training_runs_total{kind=recognition, outcome=success}.""" async def fake_to_thread(func, *args, **kwargs): return _fake_training_result() with patch("main.TRAINING_TOKEN", "secret-token"), \ patch("main._models_ready", True), \ patch("main.asyncio.to_thread", side_effect=fake_to_thread): async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: response = await client.post( "/train-sender", files={"file": ("training.zip", _minimal_zip(), "application/zip")}, data={"output_model_path": "/app/models/sender_test.mlmodel"}, headers={"X-Training-Token": "secret-token"}, ) assert response.status_code == 200, response.text assert fresh_metrics.ocr_training_runs_total.labels( kind="recognition", outcome="success" )._value.get() == 1.0 @pytest.mark.asyncio async def test_ocr_model_accuracy_gauge_stays_default_when_training_returns_no_accuracy(fresh_metrics): """When the runner returns accuracy=None, ocr_model_accuracy must remain at its default 0.""" async def fake_to_thread(func, *args, **kwargs): return {"loss": None, "accuracy": None, "cer": None, "epochs": 5} with patch("main.TRAINING_TOKEN", "secret-token"), \ patch("main._models_ready", True), \ patch("main.asyncio.to_thread", side_effect=fake_to_thread): async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: response = await client.post( "/train", files={"file": ("training.zip", _minimal_zip(), "application/zip")}, headers={"X-Training-Token": "secret-token"}, ) assert response.status_code == 200 # Gauge was never .set() — accessing the label child still creates it with default 0.0. assert fresh_metrics.ocr_model_accuracy.labels( kind="recognition" )._value.get() == 0.0 @pytest.mark.asyncio async def test_ocr_model_accuracy_gauge_set_per_kind_after_successful_training(fresh_metrics): """After /train and /segtrain succeed, ocr_model_accuracy{kind=...} reflects the result.""" recognition_accuracy = 0.917 segmentation_accuracy = 0.834 async def fake_recognition_to_thread(func, *args, **kwargs): return _fake_training_result(accuracy=recognition_accuracy) async def fake_segmentation_to_thread(func, *args, **kwargs): return _fake_training_result(accuracy=segmentation_accuracy) with patch("main.TRAINING_TOKEN", "secret-token"), \ patch("main._models_ready", True): async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: with patch("main.asyncio.to_thread", side_effect=fake_recognition_to_thread): rec_resp = await client.post( "/train", files={"file": ("training.zip", _minimal_zip(), "application/zip")}, headers={"X-Training-Token": "secret-token"}, ) assert rec_resp.status_code == 200 with patch("main.asyncio.to_thread", side_effect=fake_segmentation_to_thread): seg_resp = await client.post( "/segtrain", files={"file": ("training.zip", _minimal_zip(), "application/zip")}, headers={"X-Training-Token": "secret-token"}, ) assert seg_resp.status_code == 200 assert fresh_metrics.ocr_model_accuracy.labels(kind="recognition")._value.get() == pytest.approx(recognition_accuracy) assert fresh_metrics.ocr_model_accuracy.labels(kind="segmentation")._value.get() == pytest.approx(segmentation_accuracy) def test_ocr_models_ready_gauge_defaults_to_zero(): """A freshly-built OcrMetrics has ocr_models_ready=0 before lifespan runs.""" metrics = build_metrics(CollectorRegistry()) assert metrics.ocr_models_ready._value.get() == 0.0 @pytest.mark.asyncio async def test_ocr_models_ready_gauge_is_one_after_lifespan_startup(fresh_metrics): """The lifespan flips ocr_models_ready to 1 once load_models / load_spell_checker return. ASGITransport does not run lifespan by default, so the lifespan context manager is driven directly to exercise the startup code path. """ assert fresh_metrics.ocr_models_ready._value.get() == 0.0 with patch("main.kraken_engine.load_models"), \ patch("main.load_spell_checker"): async with app.router.lifespan_context(app): assert fresh_metrics.ocr_models_ready._value.get() == 1.0 @pytest.mark.asyncio async def test_ocr_processing_seconds_histogram_observed_per_page_in_guided_stream(fresh_metrics): """The guided streaming generator observes ocr_processing_seconds once per page.""" mock_images = [Image.new("RGB", (100, 100)) for _ in range(2)] regions = [ {"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 0.5, "height": 0.5, "annotationId": "a1"}, {"pageNumber": 2, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0, "annotationId": "a2"}, ] with patch("main.kraken_engine.load_models"), \ patch("main.load_spell_checker"), \ patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \ patch("main.preprocess_page", side_effect=lambda img: img), \ patch("main.surya_engine.extract_region_text", return_value="text"): async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: import main as main_module main_module._models_ready = True try: async with client.stream("POST", "/ocr/stream", json={ "pdfUrl": "http://minio/doc.pdf", "scriptType": "TYPEWRITER", "language": "de", "regions": regions, }) as response: assert response.status_code == 200 async for _ in response.aiter_lines(): pass finally: main_module._models_ready = False sum_seconds, count = _histogram_count_sum( fresh_metrics.ocr_processing_seconds, engine="surya" ) assert count == 2.0 assert sum_seconds >= 0.0 @pytest.mark.asyncio async def test_ocr_processing_seconds_histogram_excludes_spell_check_time_in_guided_stream(fresh_metrics): """The guided observation must time engine work only, not the spell-check pass.""" mock_images = [Image.new("RGB", (100, 100))] regions = [ {"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 0.5, "height": 0.5, "annotationId": "a1"}, {"pageNumber": 1, "x": 0.5, "y": 0.0, "width": 0.5, "height": 0.5, "annotationId": "a2"}, ] def slow_correct(text): import time as _time _time.sleep(0.05) return text with patch("main.kraken_engine.load_models"), \ patch("main.load_spell_checker"), \ patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \ patch("main.preprocess_page", side_effect=lambda img: img), \ patch("main.kraken_engine.is_available", return_value=True), \ patch("main.kraken_engine.extract_region_text", return_value="text"), \ patch("main.correct_text", side_effect=slow_correct): async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: import main as main_module main_module._models_ready = True try: async with client.stream("POST", "/ocr/stream", json={ "pdfUrl": "http://minio/doc.pdf", "scriptType": "HANDWRITING_KURRENT", "language": "de", "regions": regions, }) as response: assert response.status_code == 200 async for _ in response.aiter_lines(): pass finally: main_module._models_ready = False sum_seconds, _ = _histogram_count_sum( fresh_metrics.ocr_processing_seconds, engine="kraken" ) # Spell-check sleeps 0.05s per region × 2 regions = 0.1s; engine work is instantaneous. # If timing included spell-check, sum_seconds would be >= 0.1s. Allow 30ms slack # for scheduler overhead. assert sum_seconds < 0.05, f"timing must exclude spell-check; got sum={sum_seconds}" @pytest.mark.asyncio async def test_ocr_jobs_total_not_incremented_when_pdf_download_fails_in_stream(fresh_metrics): """If `_download_and_convert_pdf` raises, ocr_jobs_total is NOT incremented. Mirrors the /ocr endpoint's semantics: the counter only records jobs that actually started OCR work, not failed downloads. """ async def fail_download(url): raise RuntimeError("synthetic download failure") with patch("main.kraken_engine.load_models"), \ patch("main.load_spell_checker"), \ patch("main._download_and_convert_pdf", new=fail_download): transport = ASGITransport(app=app, raise_app_exceptions=False) async with AsyncClient(transport=transport, base_url="http://test") as client: import main as main_module main_module._models_ready = True try: response = await client.post("/ocr/stream", json={ "pdfUrl": "http://minio/doc.pdf", "scriptType": "TYPEWRITER", "language": "de", }) finally: main_module._models_ready = False assert response.status_code == 500 assert fresh_metrics.ocr_jobs_total.labels( engine="surya", script_type="TYPEWRITER" )._value.get() == 0.0 def test_uvicorn_access_log_filter_fails_open_on_short_or_missing_args(): """The filter must default-allow records when args is None or shorter than expected. Locks in fail-open behavior: if uvicorn ever changes its format we keep forwarding records to the handler rather than silently dropping logs. """ import logging as _logging from main import MetricsPathFilter filt = MetricsPathFilter() none_record = _logging.LogRecord( name="uvicorn.access", level=_logging.INFO, pathname="", lineno=0, msg="some message", args=None, exc_info=None, ) short_record = _logging.LogRecord( name="uvicorn.access", level=_logging.INFO, pathname="", lineno=0, msg="%s %s", args=("a", "b"), exc_info=None, ) assert filt.filter(none_record) is True assert filt.filter(short_record) is True def test_uvicorn_access_log_filter_skips_metrics_path(): """The MetricsPathFilter drops uvicorn.access log records that target /metrics.""" import logging as _logging from main import MetricsPathFilter filt = MetricsPathFilter() metrics_record = _logging.LogRecord( name="uvicorn.access", level=_logging.INFO, pathname="", lineno=0, msg='%s - "%s %s HTTP/%s" %d', args=("127.0.0.1:1234", "GET", "/metrics", "1.1", 200), exc_info=None, ) health_record = _logging.LogRecord( name="uvicorn.access", level=_logging.INFO, pathname="", lineno=0, msg='%s - "%s %s HTTP/%s" %d', args=("127.0.0.1:1234", "GET", "/health", "1.1", 200), exc_info=None, ) ocr_record = _logging.LogRecord( name="uvicorn.access", level=_logging.INFO, pathname="", lineno=0, msg='%s - "%s %s HTTP/%s" %d', args=("127.0.0.1:1234", "POST", "/ocr", "1.1", 200), exc_info=None, ) assert filt.filter(metrics_record) is False assert filt.filter(health_record) is False assert filt.filter(ocr_record) is True