feat(ocr): rewrite runSingleDocument to use streamBlocks with per-page progress

Replace the single extractBlocks() call with streamBlocks() that
processes pages incrementally. Each page's blocks are persisted
immediately via createSingleBlock(). Progress updates use the
ANALYZING_PAGE:current:total:blocks format. Per-page errors are
logged at WARN level without failing the entire job. The batch path
(processDocument) remains on the old extractBlocks() path.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Marcel
2026-04-13 10:07:06 +02:00
parent 6823973429
commit 292dc66f3c
2 changed files with 182 additions and 6 deletions

View File

@@ -13,6 +13,7 @@ import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
@Component
@RequiredArgsConstructor
@@ -54,14 +55,56 @@ public class OcrAsyncRunner {
String pdfUrl = fileService.generatePresignedUrl(doc.getFilePath());
updateProgress(job, "ANALYZING");
List<OcrBlockResult> blocks = ocrClient.extractBlocks(pdfUrl, doc.getScriptType());
updateProgress(job, "CREATING_BLOCKS:" + blocks.size());
createTranscriptionBlocks(documentId, blocks, userId, doc.getFileHash());
AtomicInteger blockCounter = new AtomicInteger(0);
AtomicInteger currentPage = new AtomicInteger(0);
AtomicInteger skippedPages = new AtomicInteger(0);
AtomicInteger totalPages = new AtomicInteger(0);
ocrClient.streamBlocks(pdfUrl, doc.getScriptType(), event -> {
switch (event) {
case OcrStreamEvent.Start start -> {
totalPages.set(start.totalPages());
if (jobDoc != null) {
jobDoc.setTotalPages(start.totalPages());
ocrJobDocumentRepository.save(jobDoc);
}
}
case OcrStreamEvent.Page page -> {
for (OcrBlockResult block : page.blocks()) {
createSingleBlock(documentId, block, userId,
doc.getFileHash(), blockCounter.getAndIncrement());
}
currentPage.incrementAndGet();
if (jobDoc != null) {
jobDoc.setCurrentPage(currentPage.get());
ocrJobDocumentRepository.save(jobDoc);
}
updateProgress(job, "ANALYZING_PAGE:" + currentPage.get()
+ ":" + totalPages.get() + ":" + blockCounter.get());
}
case OcrStreamEvent.Error error -> {
log.warn("OCR page {} failed for document {}: {}",
error.pageNumber(), documentId, error.message());
skippedPages.incrementAndGet();
currentPage.incrementAndGet();
if (jobDoc != null) {
jobDoc.setCurrentPage(currentPage.get());
ocrJobDocumentRepository.save(jobDoc);
}
}
case OcrStreamEvent.Done done -> {
if (jobDoc != null) {
jobDoc.setCurrentPage(totalPages.get());
ocrJobDocumentRepository.save(jobDoc);
}
}
}
});
job.setStatus(OcrJobStatus.DONE);
job.setProcessedDocuments(1);
updateProgress(job, "DONE:" + blocks.size());
updateProgress(job, "DONE:" + blockCounter.get() + ":" + skippedPages.get());
if (jobDoc != null) {
jobDoc.setStatus(OcrDocumentStatus.DONE);
ocrJobDocumentRepository.save(jobDoc);

View File

@@ -12,9 +12,11 @@ import org.raddatz.familienarchiv.repository.OcrJobDocumentRepository;
import org.raddatz.familienarchiv.repository.OcrJobRepository;
import org.raddatz.familienarchiv.repository.TranscriptionBlockRepository;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
@@ -103,15 +105,25 @@ class OcrAsyncRunnerTest {
UUID docId = UUID.randomUUID();
UUID userId = UUID.randomUUID();
OcrJob job = OcrJob.builder().id(jobId).totalDocuments(1).status(OcrJobStatus.PENDING).build();
OcrJobDocument jobDoc = OcrJobDocument.builder().id(UUID.randomUUID())
.jobId(jobId).documentId(docId).status(OcrDocumentStatus.PENDING).build();
Document doc = Document.builder().id(docId).filePath("test.pdf")
.fileHash("hash").scriptType(ScriptType.TYPEWRITER).build();
when(ocrJobRepository.findById(jobId)).thenReturn(Optional.of(job));
when(ocrJobRepository.save(any())).thenAnswer(inv -> inv.getArgument(0));
when(ocrJobDocumentRepository.findByJobIdAndDocumentId(jobId, docId))
.thenReturn(Optional.of(jobDoc));
when(documentService.getDocumentById(docId)).thenReturn(doc);
when(transcriptionService.listBlocks(docId)).thenReturn(List.of());
when(fileService.generatePresignedUrl(any())).thenReturn("http://presigned");
when(ocrClient.extractBlocks(any(), any())).thenReturn(List.of());
doAnswer(inv -> {
Consumer<OcrStreamEvent> handler = inv.getArgument(2);
handler.accept(new OcrStreamEvent.Start(1));
handler.accept(new OcrStreamEvent.Page(0, List.of()));
handler.accept(new OcrStreamEvent.Done(0, 0));
return null;
}).when(ocrClient).streamBlocks(any(), any(), any());
ocrAsyncRunner.runSingleDocument(jobId, docId, userId);
@@ -124,19 +136,140 @@ class OcrAsyncRunnerTest {
UUID docId = UUID.randomUUID();
UUID userId = UUID.randomUUID();
OcrJob job = OcrJob.builder().id(jobId).totalDocuments(1).status(OcrJobStatus.PENDING).build();
OcrJobDocument jobDoc = OcrJobDocument.builder().id(UUID.randomUUID())
.jobId(jobId).documentId(docId).status(OcrDocumentStatus.PENDING).build();
Document doc = Document.builder().id(docId).filePath("test.pdf")
.fileHash("hash").scriptType(ScriptType.TYPEWRITER).build();
when(ocrJobRepository.findById(jobId)).thenReturn(Optional.of(job));
when(ocrJobRepository.save(any())).thenAnswer(inv -> inv.getArgument(0));
when(ocrJobDocumentRepository.findByJobIdAndDocumentId(jobId, docId))
.thenReturn(Optional.of(jobDoc));
when(documentService.getDocumentById(docId)).thenReturn(doc);
when(transcriptionService.listBlocks(docId)).thenReturn(List.of());
when(fileService.generatePresignedUrl(any())).thenReturn("http://presigned");
when(ocrClient.extractBlocks(any(), any())).thenThrow(new RuntimeException("OCR failed"));
doThrow(new RuntimeException("OCR failed")).when(ocrClient).streamBlocks(any(), any(), any());
ocrAsyncRunner.runSingleDocument(jobId, docId, userId);
assertThat(job.getStatus()).isEqualTo(OcrJobStatus.FAILED);
assertThat(job.getErrorCount()).isEqualTo(1);
}
@Test
void runSingleDocument_updatesProgressPerPage() {
UUID jobId = UUID.randomUUID();
UUID docId = UUID.randomUUID();
UUID userId = UUID.randomUUID();
OcrJob job = OcrJob.builder().id(jobId).totalDocuments(1).status(OcrJobStatus.PENDING).build();
OcrJobDocument jobDoc = OcrJobDocument.builder().id(UUID.randomUUID())
.jobId(jobId).documentId(docId).status(OcrDocumentStatus.PENDING).build();
Document doc = Document.builder().id(docId).filePath("test.pdf")
.fileHash("hash").scriptType(ScriptType.TYPEWRITER).build();
DocumentAnnotation ann = DocumentAnnotation.builder().id(UUID.randomUUID()).build();
when(ocrJobRepository.findById(jobId)).thenReturn(Optional.of(job));
when(ocrJobRepository.save(any())).thenAnswer(inv -> inv.getArgument(0));
when(ocrJobDocumentRepository.findByJobIdAndDocumentId(jobId, docId))
.thenReturn(Optional.of(jobDoc));
when(ocrJobDocumentRepository.save(any())).thenAnswer(inv -> inv.getArgument(0));
when(documentService.getDocumentById(docId)).thenReturn(doc);
when(transcriptionService.listBlocks(docId)).thenReturn(List.of());
when(fileService.generatePresignedUrl(any())).thenReturn("http://presigned");
when(annotationService.createOcrAnnotation(any(), any(), any(), any(), any())).thenReturn(ann);
List<String> progressMessages = new ArrayList<>();
doAnswer(inv -> {
Consumer<OcrStreamEvent> handler = inv.getArgument(2);
handler.accept(new OcrStreamEvent.Start(3));
handler.accept(new OcrStreamEvent.Page(0, List.of(
new OcrBlockResult(0, 0.1, 0.1, 0.8, 0.04, null, "L1"),
new OcrBlockResult(0, 0.1, 0.2, 0.8, 0.04, null, "L2"))));
progressMessages.add(job.getProgressMessage());
handler.accept(new OcrStreamEvent.Page(1, List.of(
new OcrBlockResult(1, 0.1, 0.1, 0.8, 0.04, null, "L3"))));
progressMessages.add(job.getProgressMessage());
handler.accept(new OcrStreamEvent.Done(3, 0));
return null;
}).when(ocrClient).streamBlocks(any(), any(), any());
ocrAsyncRunner.runSingleDocument(jobId, docId, userId);
assertThat(progressMessages.get(0)).isEqualTo("ANALYZING_PAGE:1:3:2");
assertThat(progressMessages.get(1)).isEqualTo("ANALYZING_PAGE:2:3:3");
assertThat(job.getProgressMessage()).isEqualTo("DONE:3:0");
}
@Test
void runSingleDocument_includesSkippedPagesInDoneMessage() {
UUID jobId = UUID.randomUUID();
UUID docId = UUID.randomUUID();
UUID userId = UUID.randomUUID();
OcrJob job = OcrJob.builder().id(jobId).totalDocuments(1).status(OcrJobStatus.PENDING).build();
OcrJobDocument jobDoc = OcrJobDocument.builder().id(UUID.randomUUID())
.jobId(jobId).documentId(docId).status(OcrDocumentStatus.PENDING).build();
Document doc = Document.builder().id(docId).filePath("test.pdf")
.fileHash("hash").scriptType(ScriptType.TYPEWRITER).build();
when(ocrJobRepository.findById(jobId)).thenReturn(Optional.of(job));
when(ocrJobRepository.save(any())).thenAnswer(inv -> inv.getArgument(0));
when(ocrJobDocumentRepository.findByJobIdAndDocumentId(jobId, docId))
.thenReturn(Optional.of(jobDoc));
when(ocrJobDocumentRepository.save(any())).thenAnswer(inv -> inv.getArgument(0));
when(documentService.getDocumentById(docId)).thenReturn(doc);
when(transcriptionService.listBlocks(docId)).thenReturn(List.of());
when(fileService.generatePresignedUrl(any())).thenReturn("http://presigned");
doAnswer(inv -> {
Consumer<OcrStreamEvent> handler = inv.getArgument(2);
handler.accept(new OcrStreamEvent.Start(3));
handler.accept(new OcrStreamEvent.Page(0, List.of()));
handler.accept(new OcrStreamEvent.Error(1, "failed"));
handler.accept(new OcrStreamEvent.Page(2, List.of()));
handler.accept(new OcrStreamEvent.Done(0, 1));
return null;
}).when(ocrClient).streamBlocks(any(), any(), any());
ocrAsyncRunner.runSingleDocument(jobId, docId, userId);
assertThat(job.getStatus()).isEqualTo(OcrJobStatus.DONE);
assertThat(job.getProgressMessage()).isEqualTo("DONE:0:1");
}
@Test
void runSingleDocument_logsStreamErrorAtWarnWithoutSettingJobFailed() {
UUID jobId = UUID.randomUUID();
UUID docId = UUID.randomUUID();
UUID userId = UUID.randomUUID();
OcrJob job = OcrJob.builder().id(jobId).totalDocuments(1).status(OcrJobStatus.PENDING).build();
OcrJobDocument jobDoc = OcrJobDocument.builder().id(UUID.randomUUID())
.jobId(jobId).documentId(docId).status(OcrDocumentStatus.PENDING).build();
Document doc = Document.builder().id(docId).filePath("test.pdf")
.fileHash("hash").scriptType(ScriptType.TYPEWRITER).build();
when(ocrJobRepository.findById(jobId)).thenReturn(Optional.of(job));
when(ocrJobRepository.save(any())).thenAnswer(inv -> inv.getArgument(0));
when(ocrJobDocumentRepository.findByJobIdAndDocumentId(jobId, docId))
.thenReturn(Optional.of(jobDoc));
when(ocrJobDocumentRepository.save(any())).thenAnswer(inv -> inv.getArgument(0));
when(documentService.getDocumentById(docId)).thenReturn(doc);
when(transcriptionService.listBlocks(docId)).thenReturn(List.of());
when(fileService.generatePresignedUrl(any())).thenReturn("http://presigned");
doAnswer(inv -> {
Consumer<OcrStreamEvent> handler = inv.getArgument(2);
handler.accept(new OcrStreamEvent.Start(2));
handler.accept(new OcrStreamEvent.Error(0, "some python traceback details"));
handler.accept(new OcrStreamEvent.Page(1, List.of()));
handler.accept(new OcrStreamEvent.Done(0, 1));
return null;
}).when(ocrClient).streamBlocks(any(), any(), any());
ocrAsyncRunner.runSingleDocument(jobId, docId, userId);
// Job should still be DONE, not FAILED (per-page errors don't fail the whole job)
assertThat(job.getStatus()).isEqualTo(OcrJobStatus.DONE);
// Raw error message should not leak to progress
assertThat(job.getProgressMessage()).doesNotContain("python traceback");
}
}