diff --git a/backend/src/main/java/org/raddatz/familienarchiv/service/OcrAsyncRunner.java b/backend/src/main/java/org/raddatz/familienarchiv/service/OcrAsyncRunner.java index bccc923a..b3ab3bea 100644 --- a/backend/src/main/java/org/raddatz/familienarchiv/service/OcrAsyncRunner.java +++ b/backend/src/main/java/org/raddatz/familienarchiv/service/OcrAsyncRunner.java @@ -13,6 +13,7 @@ import org.springframework.stereotype.Component; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; @Component @RequiredArgsConstructor @@ -54,14 +55,56 @@ public class OcrAsyncRunner { String pdfUrl = fileService.generatePresignedUrl(doc.getFilePath()); updateProgress(job, "ANALYZING"); - List blocks = ocrClient.extractBlocks(pdfUrl, doc.getScriptType()); - updateProgress(job, "CREATING_BLOCKS:" + blocks.size()); - createTranscriptionBlocks(documentId, blocks, userId, doc.getFileHash()); + AtomicInteger blockCounter = new AtomicInteger(0); + AtomicInteger currentPage = new AtomicInteger(0); + AtomicInteger skippedPages = new AtomicInteger(0); + AtomicInteger totalPages = new AtomicInteger(0); + + ocrClient.streamBlocks(pdfUrl, doc.getScriptType(), event -> { + switch (event) { + case OcrStreamEvent.Start start -> { + totalPages.set(start.totalPages()); + if (jobDoc != null) { + jobDoc.setTotalPages(start.totalPages()); + ocrJobDocumentRepository.save(jobDoc); + } + } + case OcrStreamEvent.Page page -> { + for (OcrBlockResult block : page.blocks()) { + createSingleBlock(documentId, block, userId, + doc.getFileHash(), blockCounter.getAndIncrement()); + } + currentPage.incrementAndGet(); + if (jobDoc != null) { + jobDoc.setCurrentPage(currentPage.get()); + ocrJobDocumentRepository.save(jobDoc); + } + updateProgress(job, "ANALYZING_PAGE:" + currentPage.get() + + ":" + totalPages.get() + ":" + blockCounter.get()); + } + case OcrStreamEvent.Error error -> { + log.warn("OCR page {} failed for document {}: {}", + error.pageNumber(), documentId, error.message()); + skippedPages.incrementAndGet(); + currentPage.incrementAndGet(); + if (jobDoc != null) { + jobDoc.setCurrentPage(currentPage.get()); + ocrJobDocumentRepository.save(jobDoc); + } + } + case OcrStreamEvent.Done done -> { + if (jobDoc != null) { + jobDoc.setCurrentPage(totalPages.get()); + ocrJobDocumentRepository.save(jobDoc); + } + } + } + }); job.setStatus(OcrJobStatus.DONE); job.setProcessedDocuments(1); - updateProgress(job, "DONE:" + blocks.size()); + updateProgress(job, "DONE:" + blockCounter.get() + ":" + skippedPages.get()); if (jobDoc != null) { jobDoc.setStatus(OcrDocumentStatus.DONE); ocrJobDocumentRepository.save(jobDoc); diff --git a/backend/src/test/java/org/raddatz/familienarchiv/service/OcrAsyncRunnerTest.java b/backend/src/test/java/org/raddatz/familienarchiv/service/OcrAsyncRunnerTest.java index e4275378..018befc9 100644 --- a/backend/src/test/java/org/raddatz/familienarchiv/service/OcrAsyncRunnerTest.java +++ b/backend/src/test/java/org/raddatz/familienarchiv/service/OcrAsyncRunnerTest.java @@ -12,9 +12,11 @@ import org.raddatz.familienarchiv.repository.OcrJobDocumentRepository; import org.raddatz.familienarchiv.repository.OcrJobRepository; import org.raddatz.familienarchiv.repository.TranscriptionBlockRepository; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.function.Consumer; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; @@ -103,15 +105,25 @@ class OcrAsyncRunnerTest { UUID docId = UUID.randomUUID(); UUID userId = UUID.randomUUID(); OcrJob job = OcrJob.builder().id(jobId).totalDocuments(1).status(OcrJobStatus.PENDING).build(); + OcrJobDocument jobDoc = OcrJobDocument.builder().id(UUID.randomUUID()) + .jobId(jobId).documentId(docId).status(OcrDocumentStatus.PENDING).build(); Document doc = Document.builder().id(docId).filePath("test.pdf") .fileHash("hash").scriptType(ScriptType.TYPEWRITER).build(); when(ocrJobRepository.findById(jobId)).thenReturn(Optional.of(job)); when(ocrJobRepository.save(any())).thenAnswer(inv -> inv.getArgument(0)); + when(ocrJobDocumentRepository.findByJobIdAndDocumentId(jobId, docId)) + .thenReturn(Optional.of(jobDoc)); when(documentService.getDocumentById(docId)).thenReturn(doc); when(transcriptionService.listBlocks(docId)).thenReturn(List.of()); when(fileService.generatePresignedUrl(any())).thenReturn("http://presigned"); - when(ocrClient.extractBlocks(any(), any())).thenReturn(List.of()); + doAnswer(inv -> { + Consumer handler = inv.getArgument(2); + handler.accept(new OcrStreamEvent.Start(1)); + handler.accept(new OcrStreamEvent.Page(0, List.of())); + handler.accept(new OcrStreamEvent.Done(0, 0)); + return null; + }).when(ocrClient).streamBlocks(any(), any(), any()); ocrAsyncRunner.runSingleDocument(jobId, docId, userId); @@ -124,19 +136,140 @@ class OcrAsyncRunnerTest { UUID docId = UUID.randomUUID(); UUID userId = UUID.randomUUID(); OcrJob job = OcrJob.builder().id(jobId).totalDocuments(1).status(OcrJobStatus.PENDING).build(); + OcrJobDocument jobDoc = OcrJobDocument.builder().id(UUID.randomUUID()) + .jobId(jobId).documentId(docId).status(OcrDocumentStatus.PENDING).build(); Document doc = Document.builder().id(docId).filePath("test.pdf") .fileHash("hash").scriptType(ScriptType.TYPEWRITER).build(); when(ocrJobRepository.findById(jobId)).thenReturn(Optional.of(job)); when(ocrJobRepository.save(any())).thenAnswer(inv -> inv.getArgument(0)); + when(ocrJobDocumentRepository.findByJobIdAndDocumentId(jobId, docId)) + .thenReturn(Optional.of(jobDoc)); when(documentService.getDocumentById(docId)).thenReturn(doc); when(transcriptionService.listBlocks(docId)).thenReturn(List.of()); when(fileService.generatePresignedUrl(any())).thenReturn("http://presigned"); - when(ocrClient.extractBlocks(any(), any())).thenThrow(new RuntimeException("OCR failed")); + doThrow(new RuntimeException("OCR failed")).when(ocrClient).streamBlocks(any(), any(), any()); ocrAsyncRunner.runSingleDocument(jobId, docId, userId); assertThat(job.getStatus()).isEqualTo(OcrJobStatus.FAILED); assertThat(job.getErrorCount()).isEqualTo(1); } + + @Test + void runSingleDocument_updatesProgressPerPage() { + UUID jobId = UUID.randomUUID(); + UUID docId = UUID.randomUUID(); + UUID userId = UUID.randomUUID(); + OcrJob job = OcrJob.builder().id(jobId).totalDocuments(1).status(OcrJobStatus.PENDING).build(); + OcrJobDocument jobDoc = OcrJobDocument.builder().id(UUID.randomUUID()) + .jobId(jobId).documentId(docId).status(OcrDocumentStatus.PENDING).build(); + Document doc = Document.builder().id(docId).filePath("test.pdf") + .fileHash("hash").scriptType(ScriptType.TYPEWRITER).build(); + DocumentAnnotation ann = DocumentAnnotation.builder().id(UUID.randomUUID()).build(); + + when(ocrJobRepository.findById(jobId)).thenReturn(Optional.of(job)); + when(ocrJobRepository.save(any())).thenAnswer(inv -> inv.getArgument(0)); + when(ocrJobDocumentRepository.findByJobIdAndDocumentId(jobId, docId)) + .thenReturn(Optional.of(jobDoc)); + when(ocrJobDocumentRepository.save(any())).thenAnswer(inv -> inv.getArgument(0)); + when(documentService.getDocumentById(docId)).thenReturn(doc); + when(transcriptionService.listBlocks(docId)).thenReturn(List.of()); + when(fileService.generatePresignedUrl(any())).thenReturn("http://presigned"); + when(annotationService.createOcrAnnotation(any(), any(), any(), any(), any())).thenReturn(ann); + + List progressMessages = new ArrayList<>(); + doAnswer(inv -> { + Consumer handler = inv.getArgument(2); + handler.accept(new OcrStreamEvent.Start(3)); + handler.accept(new OcrStreamEvent.Page(0, List.of( + new OcrBlockResult(0, 0.1, 0.1, 0.8, 0.04, null, "L1"), + new OcrBlockResult(0, 0.1, 0.2, 0.8, 0.04, null, "L2")))); + progressMessages.add(job.getProgressMessage()); + handler.accept(new OcrStreamEvent.Page(1, List.of( + new OcrBlockResult(1, 0.1, 0.1, 0.8, 0.04, null, "L3")))); + progressMessages.add(job.getProgressMessage()); + handler.accept(new OcrStreamEvent.Done(3, 0)); + return null; + }).when(ocrClient).streamBlocks(any(), any(), any()); + + ocrAsyncRunner.runSingleDocument(jobId, docId, userId); + + assertThat(progressMessages.get(0)).isEqualTo("ANALYZING_PAGE:1:3:2"); + assertThat(progressMessages.get(1)).isEqualTo("ANALYZING_PAGE:2:3:3"); + assertThat(job.getProgressMessage()).isEqualTo("DONE:3:0"); + } + + @Test + void runSingleDocument_includesSkippedPagesInDoneMessage() { + UUID jobId = UUID.randomUUID(); + UUID docId = UUID.randomUUID(); + UUID userId = UUID.randomUUID(); + OcrJob job = OcrJob.builder().id(jobId).totalDocuments(1).status(OcrJobStatus.PENDING).build(); + OcrJobDocument jobDoc = OcrJobDocument.builder().id(UUID.randomUUID()) + .jobId(jobId).documentId(docId).status(OcrDocumentStatus.PENDING).build(); + Document doc = Document.builder().id(docId).filePath("test.pdf") + .fileHash("hash").scriptType(ScriptType.TYPEWRITER).build(); + + when(ocrJobRepository.findById(jobId)).thenReturn(Optional.of(job)); + when(ocrJobRepository.save(any())).thenAnswer(inv -> inv.getArgument(0)); + when(ocrJobDocumentRepository.findByJobIdAndDocumentId(jobId, docId)) + .thenReturn(Optional.of(jobDoc)); + when(ocrJobDocumentRepository.save(any())).thenAnswer(inv -> inv.getArgument(0)); + when(documentService.getDocumentById(docId)).thenReturn(doc); + when(transcriptionService.listBlocks(docId)).thenReturn(List.of()); + when(fileService.generatePresignedUrl(any())).thenReturn("http://presigned"); + + doAnswer(inv -> { + Consumer handler = inv.getArgument(2); + handler.accept(new OcrStreamEvent.Start(3)); + handler.accept(new OcrStreamEvent.Page(0, List.of())); + handler.accept(new OcrStreamEvent.Error(1, "failed")); + handler.accept(new OcrStreamEvent.Page(2, List.of())); + handler.accept(new OcrStreamEvent.Done(0, 1)); + return null; + }).when(ocrClient).streamBlocks(any(), any(), any()); + + ocrAsyncRunner.runSingleDocument(jobId, docId, userId); + + assertThat(job.getStatus()).isEqualTo(OcrJobStatus.DONE); + assertThat(job.getProgressMessage()).isEqualTo("DONE:0:1"); + } + + @Test + void runSingleDocument_logsStreamErrorAtWarnWithoutSettingJobFailed() { + UUID jobId = UUID.randomUUID(); + UUID docId = UUID.randomUUID(); + UUID userId = UUID.randomUUID(); + OcrJob job = OcrJob.builder().id(jobId).totalDocuments(1).status(OcrJobStatus.PENDING).build(); + OcrJobDocument jobDoc = OcrJobDocument.builder().id(UUID.randomUUID()) + .jobId(jobId).documentId(docId).status(OcrDocumentStatus.PENDING).build(); + Document doc = Document.builder().id(docId).filePath("test.pdf") + .fileHash("hash").scriptType(ScriptType.TYPEWRITER).build(); + + when(ocrJobRepository.findById(jobId)).thenReturn(Optional.of(job)); + when(ocrJobRepository.save(any())).thenAnswer(inv -> inv.getArgument(0)); + when(ocrJobDocumentRepository.findByJobIdAndDocumentId(jobId, docId)) + .thenReturn(Optional.of(jobDoc)); + when(ocrJobDocumentRepository.save(any())).thenAnswer(inv -> inv.getArgument(0)); + when(documentService.getDocumentById(docId)).thenReturn(doc); + when(transcriptionService.listBlocks(docId)).thenReturn(List.of()); + when(fileService.generatePresignedUrl(any())).thenReturn("http://presigned"); + + doAnswer(inv -> { + Consumer handler = inv.getArgument(2); + handler.accept(new OcrStreamEvent.Start(2)); + handler.accept(new OcrStreamEvent.Error(0, "some python traceback details")); + handler.accept(new OcrStreamEvent.Page(1, List.of())); + handler.accept(new OcrStreamEvent.Done(0, 1)); + return null; + }).when(ocrClient).streamBlocks(any(), any(), any()); + + ocrAsyncRunner.runSingleDocument(jobId, docId, userId); + + // Job should still be DONE, not FAILED (per-page errors don't fail the whole job) + assertThat(job.getStatus()).isEqualTo(OcrJobStatus.DONE); + // Raw error message should not leak to progress + assertThat(job.getProgressMessage()).doesNotContain("python traceback"); + } }