feat(ocr): observe ocr_processing_seconds around engine.to_thread calls

Wraps every asyncio.to_thread(engine.extract_*) call with time.monotonic()
deltas in /ocr (per document) and in both /ocr/stream generators (per page).
Streaming buckets are the useful operational signal; the non-streaming
observation is a bonus.

Refs #652 (AC5)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Marcel
2026-05-21 16:09:25 +02:00
parent 131ed336bc
commit 2e3744d9ef
2 changed files with 53 additions and 0 deletions

View File

@@ -10,6 +10,7 @@ import re
import shutil import shutil
import subprocess import subprocess
import tempfile import tempfile
import time
import zipfile import zipfile
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from datetime import datetime, timezone from datetime import datetime, timezone
@@ -108,6 +109,7 @@ async def run_ocr(request: OcrRequest):
script_type = request.scriptType.upper() script_type = request.scriptType.upper()
engine_name = "kraken" if script_type == "HANDWRITING_KURRENT" else "surya" engine_name = "kraken" if script_type == "HANDWRITING_KURRENT" else "surya"
extract_started = time.monotonic()
if script_type == "HANDWRITING_KURRENT": if script_type == "HANDWRITING_KURRENT":
if not kraken_engine.is_available(): if not kraken_engine.is_available():
raise HTTPException( raise HTTPException(
@@ -119,6 +121,9 @@ async def run_ocr(request: OcrRequest):
else: else:
# TYPEWRITER, HANDWRITING_LATIN, UNKNOWN — all use Surya # TYPEWRITER, HANDWRITING_LATIN, UNKNOWN — all use Surya
blocks = await asyncio.to_thread(surya_engine.extract_blocks, images, request.language) blocks = await asyncio.to_thread(surya_engine.extract_blocks, images, request.language)
metrics.ocr_processing_seconds.labels(engine=engine_name).observe(
time.monotonic() - extract_started
)
metrics.ocr_jobs_total.labels(engine=engine_name, script_type=script_type).inc() metrics.ocr_jobs_total.labels(engine=engine_name, script_type=script_type).inc()
@@ -194,6 +199,7 @@ async def run_ocr_stream(request: OcrRequest):
image = await asyncio.to_thread(preprocess_page, image) image = await asyncio.to_thread(preprocess_page, image)
blocks = [] blocks = []
sender_path = request.senderModelPath if use_kraken else None sender_path = request.senderModelPath if use_kraken else None
page_started = time.monotonic()
for region in page_regions: for region in page_regions:
text = await asyncio.to_thread( text = await asyncio.to_thread(
engine.extract_region_text, image, engine.extract_region_text, image,
@@ -213,6 +219,9 @@ async def run_ocr_stream(request: OcrRequest):
"annotationId": region.annotationId, "annotationId": region.annotationId,
}) })
metrics.ocr_processing_seconds.labels(engine=engine_name).observe(
time.monotonic() - page_started
)
total_blocks += len(blocks) total_blocks += len(blocks)
metrics.ocr_pages_total.labels(engine=engine_name).inc() metrics.ocr_pages_total.labels(engine=engine_name).inc()
yield json.dumps({ yield json.dumps({
@@ -258,9 +267,13 @@ async def run_ocr_stream(request: OcrRequest):
yield json.dumps({"type": "preprocessing", "pageNumber": page_idx}) + "\n" yield json.dumps({"type": "preprocessing", "pageNumber": page_idx}) + "\n"
image = await asyncio.to_thread(preprocess_page, image) image = await asyncio.to_thread(preprocess_page, image)
sender_path = request.senderModelPath if use_kraken else None sender_path = request.senderModelPath if use_kraken else None
page_started = time.monotonic()
blocks = await asyncio.to_thread( blocks = await asyncio.to_thread(
engine.extract_page_blocks, image, page_idx, request.language, sender_path engine.extract_page_blocks, image, page_idx, request.language, sender_path
) )
metrics.ocr_processing_seconds.labels(engine=engine_name).observe(
time.monotonic() - page_started
)
for block in blocks: for block in blocks:
words = block.get("words") or [] words = block.get("words") or []

View File

@@ -267,3 +267,43 @@ async def test_ocr_words_and_illegible_words_total_sum_across_blocks(fresh_metri
assert fresh_metrics.ocr_words_total._value.get() == 5.0 assert fresh_metrics.ocr_words_total._value.get() == 5.0
assert fresh_metrics.ocr_illegible_words_total._value.get() == 2.0 assert fresh_metrics.ocr_illegible_words_total._value.get() == 2.0
def _histogram_count_sum(histogram, **labels) -> tuple[float, float]:
"""Read the per-label-set _count and _sum from a prometheus_client Histogram."""
child = histogram.labels(**labels)
return child._sum.get(), sum(b.get() for b in child._buckets)
@pytest.mark.asyncio
async def test_ocr_processing_seconds_histogram_observed_per_page_in_stream(fresh_metrics):
"""The streaming generator observes ocr_processing_seconds once per page."""
mock_images = [Image.new("RGB", (100, 100)) for _ in range(2)]
mock_blocks = [{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0,
"polygon": None, "text": "ok", "words": []}]
with patch("main.kraken_engine.load_models"), \
patch("main.load_spell_checker"), \
patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main.preprocess_page", side_effect=lambda img: img), \
patch("main.surya_engine.extract_page_blocks", return_value=mock_blocks):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
import main as main_module
main_module._models_ready = True
try:
async with client.stream("POST", "/ocr/stream", json={
"pdfUrl": "http://minio/doc.pdf",
"scriptType": "TYPEWRITER",
"language": "de",
}) as response:
assert response.status_code == 200
async for _ in response.aiter_lines():
pass
finally:
main_module._models_ready = False
sum_seconds, count = _histogram_count_sum(
fresh_metrics.ocr_processing_seconds, engine="surya"
)
assert count == 2.0
assert sum_seconds >= 0.0