From 93c3154b3c0fdcb85079dbf17612bf0e6ad7ee81 Mon Sep 17 00:00:00 2001 From: Marcel Date: Mon, 13 Apr 2026 10:03:12 +0200 Subject: [PATCH] feat(ocr): implement NDJSON streaming in RestClientOcrClient Add streamBlocks() that POSTs to /ocr/stream and parses the NDJSON response line by line with a dedicated ObjectMapper. Falls back to the old /ocr endpoint via the default method when /ocr/stream returns 404. Uses a separate HttpClient with 5-minute request timeout for streaming. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../service/RestClientOcrClient.java | 101 +++++++++++++++ .../RestClientOcrClientStreamTest.java | 117 ++++++++++++++++++ 2 files changed, 218 insertions(+) create mode 100644 backend/src/test/java/org/raddatz/familienarchiv/service/RestClientOcrClientStreamTest.java diff --git a/backend/src/main/java/org/raddatz/familienarchiv/service/RestClientOcrClient.java b/backend/src/main/java/org/raddatz/familienarchiv/service/RestClientOcrClient.java index 1130e67e..a0f7ccf3 100644 --- a/backend/src/main/java/org/raddatz/familienarchiv/service/RestClientOcrClient.java +++ b/backend/src/main/java/org/raddatz/familienarchiv/service/RestClientOcrClient.java @@ -1,6 +1,10 @@ package org.raddatz.familienarchiv.service; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.raddatz.familienarchiv.model.ScriptType; import org.springframework.beans.factory.annotation.Value; @@ -10,18 +14,34 @@ import org.springframework.http.client.JdkClientHttpRequestFactory; import org.springframework.stereotype.Component; import org.springframework.web.client.RestClient; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URI; import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.List; import java.util.Map; +import java.util.function.Consumer; @Component @Slf4j public class RestClientOcrClient implements OcrClient, OcrHealthClient { + private static final ObjectMapper NDJSON_MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true); + private final RestClient restClient; + private final HttpClient streamingHttpClient; + private final String baseUrl; public RestClientOcrClient(@Value("${app.ocr.base-url:http://ocr-service:8000}") String baseUrl) { + this.baseUrl = baseUrl; + HttpClient httpClient = HttpClient.newBuilder() .version(HttpClient.Version.HTTP_1_1) .connectTimeout(Duration.ofSeconds(10)) @@ -33,6 +53,11 @@ public class RestClientOcrClient implements OcrClient, OcrHealthClient { .baseUrl(baseUrl) .requestFactory(requestFactory) .build(); + + this.streamingHttpClient = HttpClient.newBuilder() + .version(HttpClient.Version.HTTP_1_1) + .connectTimeout(Duration.ofSeconds(10)) + .build(); } @Override @@ -70,6 +95,82 @@ public class RestClientOcrClient implements OcrClient, OcrHealthClient { } } + @Override + public void streamBlocks(String pdfUrl, ScriptType scriptType, Consumer handler) { + String body; + try { + body = NDJSON_MAPPER.writeValueAsString(Map.of( + "pdfUrl", pdfUrl, + "scriptType", scriptType.name(), + "language", "de")); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize OCR request", e); + } + + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create(baseUrl + "/ocr/stream")) + .header("Content-Type", "application/json") + .POST(HttpRequest.BodyPublishers.ofString(body)) + .timeout(Duration.ofMinutes(5)) + .build(); + + try { + HttpResponse response = streamingHttpClient.send( + request, HttpResponse.BodyHandlers.ofInputStream()); + + if (response.statusCode() == 404) { + log.info("OCR service does not support /ocr/stream (404), falling back to /ocr"); + OcrClient.super.streamBlocks(pdfUrl, scriptType, handler); + return; + } + + try (InputStream inputStream = response.body()) { + parseNdjsonStream(inputStream, handler); + } + } catch (IOException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw new RuntimeException("NDJSON stream failed: " + e.getMessage(), e); + } + } + + static void parseNdjsonStream(InputStream inputStream, Consumer handler) { + try (BufferedReader reader = new BufferedReader( + new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { + String line; + while ((line = reader.readLine()) != null) { + if (line.isBlank()) continue; + + JsonNode node = NDJSON_MAPPER.readTree(line); + String type = node.path("type").asText(); + + switch (type) { + case "start" -> handler.accept( + new OcrStreamEvent.Start(node.path("totalPages").asInt())); + case "page" -> { + int pageNumber = node.path("pageNumber").asInt(); + List blocks = NDJSON_MAPPER.convertValue( + node.path("blocks"), + new TypeReference<>() {}); + handler.accept(new OcrStreamEvent.Page(pageNumber, blocks)); + } + case "error" -> handler.accept( + new OcrStreamEvent.Error( + node.path("pageNumber").asInt(), + node.path("message").asText())); + case "done" -> handler.accept( + new OcrStreamEvent.Done( + node.path("totalBlocks").asInt(), + node.path("skippedPages").asInt())); + default -> log.debug("Ignoring unknown NDJSON event type: {}", type); + } + } + } catch (IOException e) { + throw new RuntimeException("Failed to parse NDJSON stream: " + e.getMessage(), e); + } + } + record OcrBlockJson( @JsonProperty("pageNumber") int pageNumber, double x, diff --git a/backend/src/test/java/org/raddatz/familienarchiv/service/RestClientOcrClientStreamTest.java b/backend/src/test/java/org/raddatz/familienarchiv/service/RestClientOcrClientStreamTest.java new file mode 100644 index 00000000..36d5db22 --- /dev/null +++ b/backend/src/test/java/org/raddatz/familienarchiv/service/RestClientOcrClientStreamTest.java @@ -0,0 +1,117 @@ +package org.raddatz.familienarchiv.service; + +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +class RestClientOcrClientStreamTest { + + @Test + void parseNdjsonStream_dispatchesStartPageDoneInOrder() { + String ndjson = """ + {"type":"start","totalPages":2} + {"type":"page","pageNumber":0,"blocks":[{"pageNumber":0,"x":0.1,"y":0.2,"width":0.8,"height":0.1,"polygon":null,"text":"Line 1"}]} + {"type":"page","pageNumber":1,"blocks":[{"pageNumber":1,"x":0.1,"y":0.3,"width":0.8,"height":0.1,"polygon":null,"text":"Line 2"}]} + {"type":"done","totalBlocks":2,"skippedPages":0} + """; + InputStream stream = new ByteArrayInputStream(ndjson.getBytes(StandardCharsets.UTF_8)); + + List events = new ArrayList<>(); + RestClientOcrClient.parseNdjsonStream(stream, events::add); + + assertThat(events).hasSize(4); + assertThat(events.get(0)).isInstanceOf(OcrStreamEvent.Start.class); + assertThat(((OcrStreamEvent.Start) events.get(0)).totalPages()).isEqualTo(2); + + assertThat(events.get(1)).isInstanceOf(OcrStreamEvent.Page.class); + var page0 = (OcrStreamEvent.Page) events.get(1); + assertThat(page0.pageNumber()).isEqualTo(0); + assertThat(page0.blocks()).hasSize(1); + assertThat(page0.blocks().get(0).text()).isEqualTo("Line 1"); + + assertThat(events.get(2)).isInstanceOf(OcrStreamEvent.Page.class); + var page1 = (OcrStreamEvent.Page) events.get(2); + assertThat(page1.pageNumber()).isEqualTo(1); + + assertThat(events.get(3)).isInstanceOf(OcrStreamEvent.Done.class); + var done = (OcrStreamEvent.Done) events.get(3); + assertThat(done.totalBlocks()).isEqualTo(2); + assertThat(done.skippedPages()).isEqualTo(0); + } + + @Test + void parseNdjsonStream_parsesErrorEvents() { + String ndjson = """ + {"type":"start","totalPages":3} + {"type":"page","pageNumber":0,"blocks":[]} + {"type":"error","pageNumber":1,"message":"OCR processing failed on page 1"} + {"type":"page","pageNumber":2,"blocks":[]} + {"type":"done","totalBlocks":0,"skippedPages":1} + """; + InputStream stream = new ByteArrayInputStream(ndjson.getBytes(StandardCharsets.UTF_8)); + + List events = new ArrayList<>(); + RestClientOcrClient.parseNdjsonStream(stream, events::add); + + assertThat(events).hasSize(5); + assertThat(events.get(2)).isInstanceOf(OcrStreamEvent.Error.class); + var error = (OcrStreamEvent.Error) events.get(2); + assertThat(error.pageNumber()).isEqualTo(1); + assertThat(error.message()).contains("OCR processing failed"); + } + + @Test + void parseNdjsonStream_skipsBlankLines() { + String ndjson = """ + {"type":"start","totalPages":1} + + {"type":"page","pageNumber":0,"blocks":[]} + + {"type":"done","totalBlocks":0,"skippedPages":0} + """; + InputStream stream = new ByteArrayInputStream(ndjson.getBytes(StandardCharsets.UTF_8)); + + List events = new ArrayList<>(); + RestClientOcrClient.parseNdjsonStream(stream, events::add); + + assertThat(events).hasSize(3); + } + + @Test + void parseNdjsonStream_ignoresUnknownEventTypes() { + String ndjson = """ + {"type":"start","totalPages":1} + {"type":"unknown","foo":"bar"} + {"type":"done","totalBlocks":0,"skippedPages":0} + """; + InputStream stream = new ByteArrayInputStream(ndjson.getBytes(StandardCharsets.UTF_8)); + + List events = new ArrayList<>(); + RestClientOcrClient.parseNdjsonStream(stream, events::add); + + assertThat(events).hasSize(2); + } + + @Test + void parseNdjsonStream_parsesPageWithPolygon() { + String ndjson = """ + {"type":"start","totalPages":1} + {"type":"page","pageNumber":0,"blocks":[{"pageNumber":0,"x":0.1,"y":0.2,"width":0.8,"height":0.1,"polygon":[[0.1,0.2],[0.9,0.2],[0.9,0.3],[0.1,0.3]],"text":"With polygon"}]} + {"type":"done","totalBlocks":1,"skippedPages":0} + """; + InputStream stream = new ByteArrayInputStream(ndjson.getBytes(StandardCharsets.UTF_8)); + + List events = new ArrayList<>(); + RestClientOcrClient.parseNdjsonStream(stream, events::add); + + var page = (OcrStreamEvent.Page) events.get(1); + assertThat(page.blocks().get(0).polygon()).hasSize(4); + assertThat(page.blocks().get(0).text()).isEqualTo("With polygon"); + } +}