Files
familienarchiv/ocr-service/test_stream.py
2026-04-17 14:16:47 +02:00

384 lines
16 KiB
Python

"""Tests for the NDJSON streaming OCR endpoint POST /ocr/stream."""
import json
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from httpx import ASGITransport, AsyncClient
from main import app
@pytest.fixture
def mock_images():
"""Return 3 fake PIL images."""
from PIL import Image
return [Image.new("RGB", (100, 200)) for _ in range(3)]
def _make_block(page_idx, text="Test"):
return {
"pageNumber": page_idx,
"x": 0.1, "y": 0.2, "width": 0.8, "height": 0.1,
"polygon": None, "text": text,
"words": [{"text": text, "confidence": 0.95}],
}
# ─── P3: start event with total pages ────────────────────────────────────────
@pytest.mark.asyncio
async def test_ocr_stream_emits_start_event_with_total_pages(mock_images):
with patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main._models_ready", True), \
patch("main.surya_engine") as mock_surya:
mock_surya.extract_page_blocks.return_value = [_make_block(0)]
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post("/ocr/stream", json={
"pdfUrl": "http://minio/test.pdf",
"scriptType": "TYPEWRITER",
"language": "de",
})
lines = [json.loads(line) for line in response.text.strip().split("\n") if line.strip()]
assert lines[0] == {"type": "start", "totalPages": 3}
# ─── P4: page events per completed page ──────────────────────────────────────
@pytest.mark.asyncio
async def test_ocr_stream_emits_page_event_per_page_with_blocks(mock_images):
def page_blocks(image, page_idx, language="de"):
return [_make_block(page_idx, f"Page {page_idx}")]
with patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main._models_ready", True), \
patch("main.surya_engine") as mock_surya:
mock_surya.extract_page_blocks.side_effect = page_blocks
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post("/ocr/stream", json={
"pdfUrl": "http://minio/test.pdf",
"scriptType": "TYPEWRITER",
})
lines = [json.loads(line) for line in response.text.strip().split("\n") if line.strip()]
page_events = [l for l in lines if l["type"] == "page"]
assert len(page_events) == 3
assert page_events[0]["pageNumber"] == 1
assert page_events[1]["pageNumber"] == 2
assert page_events[2]["pageNumber"] == 3
assert len(page_events[0]["blocks"]) == 1
# ─── P5: done event with total blocks and skipped ────────────────────────────
@pytest.mark.asyncio
async def test_ocr_stream_emits_done_with_total_blocks(mock_images):
def page_blocks(image, page_idx, language="de"):
return [_make_block(page_idx)] * 2 # 2 blocks per page
with patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main._models_ready", True), \
patch("main.surya_engine") as mock_surya:
mock_surya.extract_page_blocks.side_effect = page_blocks
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post("/ocr/stream", json={
"pdfUrl": "http://minio/test.pdf",
"scriptType": "TYPEWRITER",
})
lines = [json.loads(line) for line in response.text.strip().split("\n") if line.strip()]
done = [l for l in lines if l["type"] == "done"][0]
assert done["totalBlocks"] == 6
assert done["skippedPages"] == 0
# ─── P6: error event on page failure, continues ──────────────────────────────
@pytest.mark.asyncio
async def test_ocr_stream_emits_error_event_on_page_failure_and_continues(mock_images):
call_count = 0
def page_blocks(image, page_idx, language="de"):
nonlocal call_count
call_count += 1
if page_idx == 2:
raise RuntimeError("Engine crashed on page 2")
return [_make_block(page_idx)]
with patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main._models_ready", True), \
patch("main.surya_engine") as mock_surya:
mock_surya.extract_page_blocks.side_effect = page_blocks
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post("/ocr/stream", json={
"pdfUrl": "http://minio/test.pdf",
"scriptType": "TYPEWRITER",
})
lines = [json.loads(line) for line in response.text.strip().split("\n") if line.strip()]
types = [l["type"] for l in lines]
assert "error" in types
error_event = [l for l in lines if l["type"] == "error"][0]
assert error_event["pageNumber"] == 2
# Error message must be generic, not the raw traceback
assert "Engine crashed" not in error_event["message"]
# Processing continued: pages 0 and 2 have page events
page_events = [l for l in lines if l["type"] == "page"]
assert len(page_events) == 2
done = [l for l in lines if l["type"] == "done"][0]
assert done["skippedPages"] == 1
# ─── P7: old /ocr endpoint still works ───────────────────────────────────────
@pytest.mark.asyncio
async def test_old_ocr_endpoint_still_returns_flat_list(mock_images):
with patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main._models_ready", True), \
patch("main.surya_engine") as mock_surya:
mock_surya.extract_blocks.return_value = [_make_block(1), _make_block(2)]
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post("/ocr", json={
"pdfUrl": "http://minio/test.pdf",
"scriptType": "TYPEWRITER",
})
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
assert len(data) == 2
assert data[0]["pageNumber"] == 1
# ─── Health and error handling ────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_health_returns_ok_when_models_ready():
with patch("main._models_ready", True), \
patch("main.kraken_engine") as mock_kraken:
mock_kraken.is_available.return_value = True
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
assert data["surya"] is True
assert data["kraken"] is True
@pytest.mark.asyncio
async def test_health_returns_503_when_models_not_ready():
with patch("main._models_ready", False):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.get("/health")
assert response.status_code == 503
@pytest.mark.asyncio
async def test_ocr_stream_returns_503_when_models_not_ready():
with patch("main._models_ready", False):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post("/ocr/stream", json={
"pdfUrl": "http://minio/test.pdf",
"scriptType": "TYPEWRITER",
})
assert response.status_code == 503
@pytest.mark.asyncio
async def test_ocr_stream_uses_kraken_for_kurrent_script(mock_images):
with patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main._models_ready", True), \
patch("main.kraken_engine") as mock_kraken:
mock_kraken.is_available.return_value = True
mock_kraken.extract_page_blocks.return_value = [_make_block(1, "Kurrent text")]
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post("/ocr/stream", json={
"pdfUrl": "http://minio/test.pdf",
"scriptType": "HANDWRITING_KURRENT",
})
lines = [json.loads(line) for line in response.text.strip().split("\n") if line.strip()]
page_events = [l for l in lines if l["type"] == "page"]
assert len(page_events) == 3
mock_kraken.extract_page_blocks.assert_called()
@pytest.mark.asyncio
async def test_ocr_stream_returns_400_when_kraken_unavailable_for_kurrent(mock_images):
with patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main._models_ready", True), \
patch("main.kraken_engine") as mock_kraken:
mock_kraken.is_available.return_value = False
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post("/ocr/stream", json={
"pdfUrl": "http://minio/test.pdf",
"scriptType": "HANDWRITING_KURRENT",
})
assert response.status_code == 400
# ─── SSRF protection ─────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_ocr_stream_rejects_disallowed_host():
with patch("main._models_ready", True):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post("/ocr/stream", json={
"pdfUrl": "http://evil.example.com/malicious.pdf",
"scriptType": "TYPEWRITER",
})
assert response.status_code == 400
assert "not allowed" in response.json()["detail"]
@pytest.mark.asyncio
async def test_ocr_stream_allows_minio_host(mock_images):
with patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main._models_ready", True), \
patch("main.surya_engine") as mock_surya:
mock_surya.extract_page_blocks.return_value = [_make_block(0)]
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post("/ocr/stream", json={
"pdfUrl": "http://minio/test.pdf",
"scriptType": "TYPEWRITER",
})
assert response.status_code == 200
@pytest.mark.asyncio
async def test_ocr_stream_applies_confidence_markers(mock_images):
"""Low-confidence words should be replaced with [unleserlich] in the stream output."""
def page_blocks(image, page_idx, language="de"):
return [{
"pageNumber": page_idx,
"x": 0.1, "y": 0.2, "width": 0.8, "height": 0.1,
"polygon": None,
"text": "original text",
"words": [
{"text": "Lieber", "confidence": 0.95},
{"text": "xkqz", "confidence": 0.1},
],
}]
with patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images[:1]), \
patch("main._models_ready", True), \
patch("main.surya_engine") as mock_surya:
mock_surya.extract_page_blocks.side_effect = page_blocks
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post("/ocr/stream", json={
"pdfUrl": "http://minio/test.pdf",
"scriptType": "TYPEWRITER",
})
lines = [json.loads(line) for line in response.text.strip().split("\n") if line.strip()]
page = [l for l in lines if l["type"] == "page"][0]
assert page["blocks"][0]["text"] == "Lieber [unleserlich]"
assert "words" not in page["blocks"][0]
# ─── Preprocessing integration ───────────────────────────────────────────────
@pytest.mark.asyncio
async def test_stream_emits_preprocessing_event_per_page_before_page_event(mock_images):
"""generate() must emit a preprocessing event for each page before its page event."""
with patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main._models_ready", True), \
patch("main.surya_engine") as mock_surya, \
patch("main.preprocess_page", side_effect=lambda img: img) as mock_preprocess:
mock_surya.extract_page_blocks.return_value = []
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post("/ocr/stream", json={
"pdfUrl": "http://minio/test.pdf",
"scriptType": "TYPEWRITER",
})
lines = [json.loads(line) for line in response.text.strip().split("\n") if line.strip()]
preprocessing_events = [l for l in lines if l["type"] == "preprocessing"]
page_events = [l for l in lines if l["type"] == "page"]
assert len(preprocessing_events) == 3
assert preprocessing_events[0]["pageNumber"] == 1
assert preprocessing_events[1]["pageNumber"] == 2
assert preprocessing_events[2]["pageNumber"] == 3
# Each preprocessing event must come immediately before the corresponding page event
for pre, page in zip(preprocessing_events, page_events):
assert pre["pageNumber"] == page["pageNumber"]
assert mock_preprocess.call_count == 3
@pytest.mark.asyncio
async def test_guided_stream_preprocesses_once_per_page_not_per_region(mock_images):
"""generate_guided() must call preprocess_page once per page, not once per region."""
regions = [
{"pageNumber": 1, "x": 0.0, "y": 0.0, "width": 0.5, "height": 0.5, "annotationId": "a1"},
{"pageNumber": 1, "x": 0.5, "y": 0.0, "width": 0.5, "height": 0.5, "annotationId": "a2"},
{"pageNumber": 2, "x": 0.0, "y": 0.0, "width": 1.0, "height": 1.0, "annotationId": "a3"},
]
with patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images[:2]), \
patch("main._models_ready", True), \
patch("main.surya_engine") as mock_surya, \
patch("main.preprocess_page", side_effect=lambda img: img) as mock_preprocess:
mock_surya.extract_region_text.return_value = "text"
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post("/ocr/stream", json={
"pdfUrl": "http://minio/test.pdf",
"scriptType": "TYPEWRITER",
"regions": regions,
})
assert response.status_code == 200
# 2 pages, each preprocessed once — not 3 (once per region)
assert mock_preprocess.call_count == 2
@pytest.mark.asyncio
async def test_ocr_endpoint_preprocesses_silently_without_emitting_events(mock_images):
"""/ocr endpoint preprocesses each page without emitting preprocessing events."""
with patch("main._download_and_convert_pdf", new_callable=AsyncMock, return_value=mock_images), \
patch("main._models_ready", True), \
patch("main.surya_engine") as mock_surya, \
patch("main.preprocess_page", side_effect=lambda img: img) as mock_preprocess:
mock_surya.extract_blocks.return_value = []
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post("/ocr", json={
"pdfUrl": "http://minio/test.pdf",
"scriptType": "TYPEWRITER",
})
assert response.status_code == 200
assert mock_preprocess.call_count == 3