refactor(ocr): extract _validate_zip_entry to utils.py so ZIP Slip test runs in CI
_validate_zip_entry has no ML-stack dependency; importing it via main.py pulled in surya/torch and caused the test to be skipped in CI. Moving it to utils.py (fastapi only) and adding fastapi to the CI lightweight install lets test_zipslip_still_anchors_under_custom_tmpdir run on every push. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -27,6 +27,7 @@ from engines import kraken as kraken_engine
|
||||
from engines import surya as surya_engine
|
||||
from models import OcrBlock, OcrRequest
|
||||
from preprocessing import preprocess_page
|
||||
from utils import _validate_zip_entry
|
||||
|
||||
TRAINING_TOKEN = os.environ.get("TRAINING_TOKEN", "")
|
||||
KRAKEN_MODEL_PATH = os.environ.get("KRAKEN_MODEL_PATH", "/app/models/german_kurrent.mlmodel")
|
||||
@@ -291,14 +292,6 @@ def _check_training_token(x_training_token: str | None) -> None:
|
||||
raise HTTPException(status_code=403, detail="Invalid or missing X-Training-Token")
|
||||
|
||||
|
||||
def _validate_zip_entry(name: str, extract_dir: str) -> None:
|
||||
"""Reject ZIP Slip attacks: path traversal and absolute paths."""
|
||||
if os.path.isabs(name) or name.startswith(".."):
|
||||
raise HTTPException(status_code=400, detail=f"Unsafe ZIP entry: {name}")
|
||||
resolved = os.path.realpath(os.path.join(extract_dir, name))
|
||||
if not resolved.startswith(os.path.realpath(extract_dir)):
|
||||
raise HTTPException(status_code=400, detail=f"ZIP Slip detected: {name}")
|
||||
|
||||
|
||||
def _rotate_backups(model_path: str, keep: int = 3) -> None:
|
||||
"""Keep only the last `keep` timestamped backups of the model."""
|
||||
|
||||
@@ -6,12 +6,8 @@ import tempfile
|
||||
|
||||
import pytest
|
||||
|
||||
try:
|
||||
from fastapi import HTTPException
|
||||
from main import _validate_zip_entry
|
||||
HAS_MAIN = True
|
||||
except ImportError:
|
||||
HAS_MAIN = False
|
||||
from fastapi import HTTPException
|
||||
from utils import _validate_zip_entry
|
||||
|
||||
_ENTRYPOINT = os.path.join(os.path.dirname(os.path.abspath(__file__)), "entrypoint.sh")
|
||||
|
||||
@@ -82,7 +78,6 @@ def test_tmpdir_is_inside_persistent_cache_volume():
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.skipif(not HAS_MAIN, reason="requires full ML stack (not available in CI)")
|
||||
def test_zipslip_still_anchors_under_custom_tmpdir(tmp_path):
|
||||
"""_validate_zip_entry rejects path-traversal when extract_dir is under a custom TMPDIR.
|
||||
|
||||
|
||||
14
ocr-service/utils.py
Normal file
14
ocr-service/utils.py
Normal file
@@ -0,0 +1,14 @@
|
||||
"""Utility functions shared across the OCR service with no ML-stack imports."""
|
||||
|
||||
import os
|
||||
|
||||
from fastapi import HTTPException
|
||||
|
||||
|
||||
def _validate_zip_entry(name: str, extract_dir: str) -> None:
|
||||
"""Reject ZIP Slip attacks: path traversal and absolute paths."""
|
||||
if os.path.isabs(name) or name.startswith(".."):
|
||||
raise HTTPException(status_code=400, detail=f"Unsafe ZIP entry: {name}")
|
||||
resolved = os.path.realpath(os.path.join(extract_dir, name))
|
||||
if not resolved.startswith(os.path.realpath(extract_dir)):
|
||||
raise HTTPException(status_code=400, detail=f"ZIP Slip detected: {name}")
|
||||
Reference in New Issue
Block a user