"""OCR microservice — FastAPI app with Surya and Kraken engine support.""" import asyncio import glob import inspect import io import json import logging import os import re import shutil import subprocess import tempfile import time import zipfile from contextlib import asynccontextmanager from datetime import datetime, timezone from typing import Awaitable, Callable from urllib.parse import urlparse import httpx import pypdfium2 as pdfium from fastapi import FastAPI, Form, Header, HTTPException, UploadFile from fastapi.responses import StreamingResponse from PIL import Image from prometheus_client import REGISTRY from prometheus_fastapi_instrumentator import Instrumentator from confidence import apply_confidence_markers, get_threshold from metrics import OcrMetrics, build_metrics from spell_check import correct_text, load_spell_checker from engines import kraken as kraken_engine from engines import surya as surya_engine from models import OcrBlock, OcrRequest from preprocessing import preprocess_page from utils import _validate_zip_entry TRAINING_TOKEN = os.environ.get("TRAINING_TOKEN", "") KRAKEN_MODEL_PATH = os.environ.get("KRAKEN_MODEL_PATH", "/app/models/german_kurrent.mlmodel") logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) _models_ready = False # One-shot import-time binding to the default REGISTRY. Tests that need a # clean counter state must monkeypatch `main.metrics` with a container built # from a fresh CollectorRegistry — rebinding through the registry directly # will not retarget the references stored in the OcrMetrics dataclass. metrics: OcrMetrics = build_metrics(REGISTRY) ALLOWED_PDF_HOSTS = set( h.strip() for h in os.getenv("ALLOWED_PDF_HOSTS", "minio,localhost,127.0.0.1").split(",") ) _SPELL_CHECK_SCRIPT_TYPES = {"HANDWRITING_KURRENT", "HANDWRITING_LATIN"} async def _record_training( runner: Callable[[], Awaitable[dict] | dict], kind: str, ) -> dict: """Run a training callable and record outcome + accuracy metrics. Wraps the per-endpoint try/except + outcome counter + accuracy gauge block that used to be repeated at /train, /train-sender, and /segtrain. The runner returns a dict with at least an `accuracy` key; if its value is None, the gauge is left at its default. """ try: result = runner() if inspect.isawaitable(result): result = await result except Exception: metrics.ocr_training_runs_total.labels(kind=kind, outcome="error").inc() raise metrics.ocr_training_runs_total.labels(kind=kind, outcome="success").inc() if result.get("accuracy") is not None: metrics.ocr_model_accuracy.labels(kind=kind).set(result["accuracy"]) return result def _observe_block_words(words: list[dict], threshold: float) -> None: """Record per-block word counts and below-threshold word counts. Pre: `words` is non-empty. Caller checks for that — keeping the helper branch-free makes the call sites read as a single line. """ metrics.ocr_words_total.inc(len(words)) metrics.ocr_illegible_words_total.inc( sum(1 for w in words if w["confidence"] < threshold) ) def _validate_url(url: str) -> None: """Validate that the PDF URL points to an allowed host (SSRF protection).""" parsed = urlparse(url) hostname = parsed.hostname or "" if hostname not in ALLOWED_PDF_HOSTS: raise HTTPException(status_code=400, detail=f"PDF host not allowed: {hostname}") @asynccontextmanager async def lifespan(app: FastAPI): """Load lightweight models at startup. Surya loads lazily on first request.""" global _models_ready if os.getuid() == 0: logger.warning("Running as root — CIS Docker §4.1 violation") logger.info("Loading Kraken model at startup (Surya loads lazily on first OCR request)...") kraken_engine.load_models() load_spell_checker() _models_ready = True metrics.ocr_models_ready.set(1) logger.info("Startup complete — ready to accept requests") yield logger.info("Shutting down OCR service") app = FastAPI(title="Familienarchiv OCR Service", lifespan=lifespan) # /metrics is unauthenticated — relies on Docker-internal-network exposure # only (CWE-200 risk if `ports:` ever maps 8000 to host). See # docs/OBSERVABILITY.md §Internal-only endpoints for the Caddy block snippet. Instrumentator(excluded_handlers=["/health", "/metrics"]).instrument(app).expose(app) class MetricsPathFilter(logging.Filter): """Drop uvicorn.access entries for /metrics and /health to keep logs focused.""" _SUPPRESSED_PATHS = {"/metrics", "/health"} def filter(self, record: logging.LogRecord) -> bool: # uvicorn.access formats as: '%s - "%s %s HTTP/%s" %d' if record.args and len(record.args) >= 3: path = record.args[2] if isinstance(path, str) and path in self._SUPPRESSED_PATHS: return False return True logging.getLogger("uvicorn.access").addFilter(MetricsPathFilter()) @app.get("/health") def health(): """Health endpoint — returns 200 only after models are loaded.""" if not _models_ready: raise HTTPException(status_code=503, detail="Models not loaded yet") return {"status": "ok", "surya": True, "kraken": kraken_engine.is_available()} @app.post("/ocr", response_model=list[OcrBlock]) async def run_ocr(request: OcrRequest): """Run OCR on a PDF document. Downloads the PDF from the provided URL, converts pages to images, and runs the appropriate OCR engine based on scriptType. OCR engines run in a thread pool so the event loop stays free for /health. """ if not _models_ready: raise HTTPException(status_code=503, detail="Models not loaded yet") images = await _download_and_convert_pdf(request.pdfUrl) for i, img in enumerate(images): images[i] = await asyncio.to_thread(preprocess_page, img) del img script_type = request.scriptType.upper() engine_name = "kraken" if script_type == "HANDWRITING_KURRENT" else "surya" extract_started = time.monotonic() if script_type == "HANDWRITING_KURRENT": if not kraken_engine.is_available(): raise HTTPException( status_code=400, detail="Kraken model not available — cannot process Kurrent script", ) blocks = await asyncio.to_thread( kraken_engine.extract_blocks, images, request.language, request.senderModelPath) else: # TYPEWRITER, HANDWRITING_LATIN, UNKNOWN — all use Surya blocks = await asyncio.to_thread(surya_engine.extract_blocks, images, request.language) metrics.ocr_processing_seconds.labels(engine=engine_name).observe( time.monotonic() - extract_started ) metrics.ocr_jobs_total.labels(engine=engine_name, script_type=script_type).inc() threshold = get_threshold(script_type) for block in blocks: words = block.get("words") or [] if words: _observe_block_words(words, threshold) block["text"] = apply_confidence_markers(words, threshold) block.pop("words", None) if script_type in _SPELL_CHECK_SCRIPT_TYPES: block["text"] = correct_text(block["text"]) return [OcrBlock(**b) for b in blocks] @app.post("/ocr/stream") async def run_ocr_stream(request: OcrRequest): """Run OCR on a PDF with NDJSON streaming — one JSON line per completed page. When request.regions is provided, runs in guided mode: each region is cropped and recognized individually, skipping full-page layout detection. The response blocks include the annotationId from the region. """ if not _models_ready: raise HTTPException(status_code=503, detail="Models not loaded yet") images = await _download_and_convert_pdf(request.pdfUrl) script_type = request.scriptType.upper() threshold = get_threshold(script_type) use_kraken = script_type == "HANDWRITING_KURRENT" if use_kraken and not kraken_engine.is_available(): raise HTTPException( status_code=400, detail="Kraken model not available — cannot process Kurrent script", ) engine = kraken_engine if use_kraken else surya_engine engine_name = "kraken" if use_kraken else "surya" metrics.ocr_jobs_total.labels(engine=engine_name, script_type=script_type).inc() if request.regions: # Guided mode: recognize only the user-drawn annotation regions regions_by_page: dict[int, list] = {} for region in request.regions: regions_by_page.setdefault(region.pageNumber, []).append(region) async def generate_guided(): total_pages = len(images) yield json.dumps({"type": "start", "totalPages": total_pages}) + "\n" total_blocks = 0 skipped_pages = 0 for page_idx, image in enumerate(images, start=1): page_regions = regions_by_page.get(page_idx, []) if not page_regions: yield json.dumps({ "type": "page", "pageNumber": page_idx, "blocks": [], }) + "\n" del image continue try: yield json.dumps({"type": "preprocessing", "pageNumber": page_idx}) + "\n" image = await asyncio.to_thread(preprocess_page, image) blocks = [] sender_path = request.senderModelPath if use_kraken else None engine_seconds = 0.0 for region in page_regions: region_started = time.monotonic() text = await asyncio.to_thread( engine.extract_region_text, image, region.x, region.y, region.width, region.height, sender_path, ) engine_seconds += time.monotonic() - region_started if script_type in _SPELL_CHECK_SCRIPT_TYPES: text = correct_text(text) blocks.append({ "pageNumber": page_idx, "x": region.x, "y": region.y, "width": region.width, "height": region.height, "polygon": None, "text": text, "annotationId": region.annotationId, }) metrics.ocr_processing_seconds.labels(engine=engine_name).observe( engine_seconds ) total_blocks += len(blocks) metrics.ocr_pages_total.labels(engine=engine_name).inc() yield json.dumps({ "type": "page", "pageNumber": page_idx, "blocks": blocks, }) + "\n" except Exception: logger.exception("Guided OCR failed on page %d", page_idx) skipped_pages += 1 metrics.ocr_skipped_pages_total.inc() yield json.dumps({ "type": "error", "pageNumber": page_idx, "message": f"Guided OCR processing failed on page {page_idx}", }) + "\n" finally: del image yield json.dumps({ "type": "done", "totalBlocks": total_blocks, "skippedPages": skipped_pages, }) + "\n" return StreamingResponse( generate_guided(), media_type="application/x-ndjson", headers={"X-Accel-Buffering": "no", "Cache-Control": "no-cache"}, ) async def generate(): total_pages = len(images) yield json.dumps({"type": "start", "totalPages": total_pages}) + "\n" total_blocks = 0 skipped_pages = 0 for page_idx, image in enumerate(images, start=1): try: yield json.dumps({"type": "preprocessing", "pageNumber": page_idx}) + "\n" image = await asyncio.to_thread(preprocess_page, image) sender_path = request.senderModelPath if use_kraken else None page_started = time.monotonic() blocks = await asyncio.to_thread( engine.extract_page_blocks, image, page_idx, request.language, sender_path ) metrics.ocr_processing_seconds.labels(engine=engine_name).observe( time.monotonic() - page_started ) for block in blocks: words = block.get("words") or [] if words: _observe_block_words(words, threshold) block["text"] = apply_confidence_markers(words, threshold) block.pop("words", None) if script_type in _SPELL_CHECK_SCRIPT_TYPES: block["text"] = correct_text(block["text"]) total_blocks += len(blocks) metrics.ocr_pages_total.labels(engine=engine_name).inc() yield json.dumps({ "type": "page", "pageNumber": page_idx, "blocks": blocks, }) + "\n" except Exception: logger.exception("OCR failed on page %d", page_idx) skipped_pages += 1 metrics.ocr_skipped_pages_total.inc() yield json.dumps({ "type": "error", "pageNumber": page_idx, "message": f"OCR processing failed on page {page_idx}", }) + "\n" finally: del image yield json.dumps({ "type": "done", "totalBlocks": total_blocks, "skippedPages": skipped_pages, }) + "\n" return StreamingResponse( generate(), media_type="application/x-ndjson", headers={ "X-Accel-Buffering": "no", "Cache-Control": "no-cache", }, ) def _check_training_token(x_training_token: str | None) -> None: """Validate training token — fails closed when TRAINING_TOKEN is not configured.""" if not TRAINING_TOKEN: raise HTTPException(status_code=503, detail="Training not configured on this node") if x_training_token != TRAINING_TOKEN: raise HTTPException(status_code=403, detail="Invalid or missing X-Training-Token") def _rotate_backups(model_path: str, keep: int = 3) -> None: """Keep only the last `keep` timestamped backups of the model.""" pattern = model_path + ".*.bak" backups = sorted(glob.glob(pattern)) for old in backups[:-keep]: try: os.remove(old) except OSError: logger.warning("Could not remove old backup: %s", old) def _parse_best_checkpoint(checkpoint_dir: str) -> tuple[float | None, int]: """Parse checkpoint filenames to find the best validation metric and epoch count. Kraken saves checkpoints as e.g. ``checkpoint_03-0.9500.ckpt``. Returns (best_accuracy, epoch_count). """ pattern = re.compile(r"checkpoint_(\d+)-([0-9.]+)\.(ckpt|mlmodel)$") best_acc: float | None = None max_epoch = 0 for fname in os.listdir(checkpoint_dir): m = pattern.match(fname) if m: epoch = int(m.group(1)) acc = float(m.group(2)) max_epoch = max(max_epoch, epoch) if best_acc is None or acc > best_acc: best_acc = acc return best_acc, max_epoch def _find_best_model(checkpoint_dir: str) -> str | None: """Return the best final model file produced by ketos train. With --weights-format coreml, ketos writes ``best_.mlmodel``. Falls back to any .mlmodel in the directory. """ # Prefer the named best file (e.g. best_0.8256.mlmodel or best_0.8256.safetensors) best_pattern = re.compile(r"best_([0-9.]+)\.(mlmodel|safetensors)$") best_acc: float | None = None best_path: str | None = None for fname in os.listdir(checkpoint_dir): m = best_pattern.match(fname) if m: acc = float(m.group(1)) if best_acc is None or acc > best_acc: best_acc = acc best_path = os.path.join(checkpoint_dir, fname) if best_path: return best_path # Fallback: any .mlmodel file for fname in os.listdir(checkpoint_dir): if fname.endswith(".mlmodel"): return os.path.join(checkpoint_dir, fname) return None @app.post("/train") async def train_model( file: UploadFile, x_training_token: str | None = Header(default=None), ): """Fine-tune the Kurrent recognition model with uploaded training data. Accepts a ZIP archive containing .png/.gt.txt training pairs exported by the Java backend. Training mutates in-process model state — not safe if the service is replicated. """ _check_training_token(x_training_token) if not _models_ready: raise HTTPException(status_code=503, detail="Models not loaded yet") zip_bytes = await file.read() training_run_id = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") log = logging.LoggerAdapter(logger, {"training_run_id": training_run_id}) log.info("Starting training run %s", training_run_id) def _run_training() -> dict: with tempfile.TemporaryDirectory() as tmp_dir: # Extract ZIP with safety checks with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf: for entry in zf.namelist(): _validate_zip_entry(entry, tmp_dir) zf.extractall(tmp_dir) log.info("Extracted %d ZIP entries to %s", len(os.listdir(tmp_dir)), tmp_dir) ground_truth = glob.glob(os.path.join(tmp_dir, "*.xml")) if not ground_truth: raise HTTPException(status_code=422, detail="No ground-truth files found in ZIP") log.info("Training on %d ground-truth pairs", len(ground_truth)) checkpoint_dir = os.path.join(tmp_dir, "checkpoints") os.makedirs(checkpoint_dir, exist_ok=True) cmd = [ "ketos", "--workers", "0", "--device", "cpu", "--threads", "2", "train", "-f", "page", "--weights-format", "coreml", "-o", checkpoint_dir, "-q", "fixed", "-N", "10", "-B", "1", ] if os.path.exists(KRAKEN_MODEL_PATH): cmd += ["-i", KRAKEN_MODEL_PATH] cmd += ground_truth log.info("Running: %s", " ".join(cmd[:6]) + " ...") proc = subprocess.run(cmd, capture_output=True, text=True) log.info("ketos train stdout: %s", proc.stdout[-2000:] if proc.stdout else "") if proc.stderr: log.info("ketos train stderr: %s", proc.stderr[-2000:]) if proc.returncode != 0: raise RuntimeError(f"ketos train failed (exit {proc.returncode}): {proc.stderr[-500:]}") accuracy, epochs = _parse_best_checkpoint(checkpoint_dir) cer = round(1.0 - accuracy, 4) if accuracy is not None else None log.info("Training complete — epochs=%s accuracy=%s cer=%s", epochs, accuracy, cer) # Find the best model file produced by training best_model = _find_best_model(checkpoint_dir) if best_model is None: raise RuntimeError("Training produced no model file") # Backup existing model and replace if os.path.exists(KRAKEN_MODEL_PATH): timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") backup_path = f"{KRAKEN_MODEL_PATH}.{timestamp}.bak" shutil.copy2(KRAKEN_MODEL_PATH, backup_path) log.info("Backed up model to %s", backup_path) _rotate_backups(KRAKEN_MODEL_PATH, keep=3) shutil.copy2(best_model, KRAKEN_MODEL_PATH) log.info("Replaced model at %s", KRAKEN_MODEL_PATH) # Reload model in-process kraken_engine.load_models() log.info("Reloaded Kraken model in-process") return {"loss": None, "accuracy": accuracy, "cer": cer, "epochs": epochs} return await _record_training(lambda: asyncio.to_thread(_run_training), kind="recognition") @app.post("/train-sender") async def train_sender_model( file: UploadFile, output_model_path: str = Form(...), x_training_token: str | None = Header(default=None), ): """Fine-tune a per-sender Kurrent model and save to output_model_path. output_model_path must be within /app/models/. The per-sender model cache is invalidated after training so the next OCR request picks up the new model. """ _check_training_token(x_training_token) if not _models_ready: raise HTTPException(status_code=503, detail="Models not loaded yet") models_dir = os.path.realpath("/app/models") resolved_output = os.path.realpath(output_model_path) if not resolved_output.startswith(models_dir + os.sep): raise HTTPException(status_code=400, detail="output_model_path must be within /app/models/") zip_bytes = await file.read() training_run_id = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") log = logging.LoggerAdapter(logger, {"training_run_id": training_run_id}) log.info("Starting sender training run %s → %s", training_run_id, output_model_path) def _run_sender_training() -> dict: with tempfile.TemporaryDirectory() as tmp_dir: with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf: for entry in zf.namelist(): _validate_zip_entry(entry, tmp_dir) zf.extractall(tmp_dir) ground_truth = glob.glob(os.path.join(tmp_dir, "*.xml")) if not ground_truth: raise HTTPException(status_code=422, detail="No ground-truth files found in ZIP") log.info("Sender training on %d ground-truth pairs", len(ground_truth)) checkpoint_dir = os.path.join(tmp_dir, "checkpoints") os.makedirs(checkpoint_dir, exist_ok=True) cmd = [ "ketos", "--workers", "0", "--device", "cpu", "--threads", "2", "train", "-f", "page", "--weights-format", "coreml", "-o", checkpoint_dir, "-q", "fixed", "-N", "10", "-B", "1", ] if os.path.exists(KRAKEN_MODEL_PATH): cmd += ["-i", KRAKEN_MODEL_PATH] cmd += ground_truth log.info("Running sender training: %s", " ".join(cmd[:6]) + " ...") proc = subprocess.run(cmd, capture_output=True, text=True) if proc.returncode != 0: raise RuntimeError(f"ketos train failed (exit {proc.returncode}): {proc.stderr[-500:]}") accuracy, epochs = _parse_best_checkpoint(checkpoint_dir) cer = round(1.0 - accuracy, 4) if accuracy is not None else None best_model = _find_best_model(checkpoint_dir) if best_model is None: raise RuntimeError("Sender training produced no model file") os.makedirs(os.path.dirname(resolved_output), exist_ok=True) shutil.copy2(best_model, resolved_output) log.info("Saved sender model to %s", resolved_output) kraken_engine._sender_registry.invalidate(output_model_path) return {"loss": None, "accuracy": accuracy, "cer": cer, "epochs": epochs} return await _record_training( lambda: asyncio.to_thread(_run_sender_training), kind="recognition" ) @app.post("/segtrain") async def segtrain_model( file: UploadFile, x_training_token: str | None = Header(default=None), ): """Fine-tune the blla segmentation model with uploaded PAGE XML training data. Accepts a ZIP archive containing .png/.xml (PAGE XML) training pairs exported by the Java backend. Training mutates in-process model state — not safe if the service is replicated. """ _check_training_token(x_training_token) if not _models_ready: raise HTTPException(status_code=503, detail="Models not loaded yet") zip_bytes = await file.read() training_run_id = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") log = logging.LoggerAdapter(logger, {"training_run_id": training_run_id}) log.info("Starting segmentation training run %s", training_run_id) blla_model_path = os.environ.get("BLLA_MODEL_PATH", "/app/models/blla.mlmodel") def _run_segtrain() -> dict: with tempfile.TemporaryDirectory() as tmp_dir: with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf: for entry in zf.namelist(): _validate_zip_entry(entry, tmp_dir) zf.extractall(tmp_dir) log.info("Extracted %d ZIP entries for segmentation training", len(os.listdir(tmp_dir))) xml_files = glob.glob(os.path.join(tmp_dir, "*.xml")) if not xml_files: raise HTTPException(status_code=422, detail="No PAGE XML files found in ZIP") log.info("Training on %d PAGE XML files", len(xml_files)) checkpoint_dir = os.path.join(tmp_dir, "checkpoints") os.makedirs(checkpoint_dir, exist_ok=True) cmd = [ "ketos", "--workers", "0", "--device", "cpu", "--threads", "2", "segtrain", "-o", checkpoint_dir, "-q", "fixed", "-N", "10", ] # Train at 800px height. The default blla model uses 1800px, which peaks at # ~7+ GB on CPU and kills the host (ketos ignores -s when -i is present, so # we cannot override the height of an existing model). # Strategy: only use the base model if it is already at 800px (i.e. was # produced by a previous fine-tuning run here). Otherwise train from scratch — # the first run bootstraps a 800px model; all subsequent runs fine-tune it. seg_spec = ( "[1,800,0,3 Cr7,7,64,2,2 Gn32 Cr3,3,128,2,2 Gn32 Cr3,3,128 Gn32 " "Cr3,3,256 Gn32 Cr3,3,256 Gn32 Lbx32 Lby32 Cr1,1,32 Gn32 Lby32 Lbx32]" ) use_base_model = False if os.path.exists(blla_model_path): try: from kraken.lib import vgsl as _vgsl _m = _vgsl.TorchVGSLModel.load_model(blla_model_path) use_base_model = _m.input[2] == 800 # input is (batch, channels, H, W) if not use_base_model: log.info( "Base model height is %dpx — skipping -i to avoid OOM; " "will train from scratch at 800px", _m.input[2], ) except Exception as exc: log.warning("Could not inspect base model height, training from scratch: %s", exc) if use_base_model: cmd += ["-i", blla_model_path, "--resize", "union", "-s", seg_spec] else: cmd += ["-s", seg_spec] cmd += xml_files log.info("Running: %s", " ".join(cmd[:5]) + " ...") proc = subprocess.run(cmd, capture_output=True, text=True) log.info("ketos segtrain stdout: %s", proc.stdout[-2000:] if proc.stdout else "") if proc.stderr: log.info("ketos segtrain stderr: %s", proc.stderr[-2000:]) if proc.returncode != 0: raise RuntimeError(f"ketos segtrain failed (exit {proc.returncode}): {proc.stderr[-500:]}") accuracy, epochs = _parse_best_checkpoint(checkpoint_dir) cer = round(1.0 - accuracy, 4) if accuracy is not None else None log.info("Segmentation training complete — epochs=%s accuracy=%s cer=%s", epochs, accuracy, cer) best_model = _find_best_model(checkpoint_dir) if best_model is None: raise RuntimeError("Segmentation training produced no model file") if os.path.exists(blla_model_path): timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") backup_path = f"{blla_model_path}.{timestamp}.bak" shutil.copy2(blla_model_path, backup_path) _rotate_backups(blla_model_path, keep=3) shutil.copy2(best_model, blla_model_path) log.info("Replaced blla model at %s", blla_model_path) return {"loss": None, "accuracy": accuracy, "cer": cer, "epochs": epochs} return await _record_training(lambda: asyncio.to_thread(_run_segtrain), kind="segmentation") async def _download_and_convert_pdf(url: str) -> list[Image.Image]: """Download a PDF from a presigned URL and convert each page to a PIL Image.""" _validate_url(url) async with httpx.AsyncClient( timeout=httpx.Timeout(300.0), follow_redirects=False ) as client: response = await client.get(url) response.raise_for_status() pdf = pdfium.PdfDocument(io.BytesIO(response.content)) images = [] for page_idx in range(len(pdf)): page = pdf[page_idx] # Render at 200 DPI — balances OCR quality vs memory usage # (Surya 0.17 models use ~5GB idle; 300 DPI causes OOM on multi-page docs) bitmap = page.render(scale=200 / 72) pil_image = bitmap.to_pil() images.append(pil_image) return images