feat(ocr): implement NDJSON streaming in RestClientOcrClient
Add streamBlocks() that POSTs to /ocr/stream and parses the NDJSON response line by line with a dedicated ObjectMapper. Falls back to the old /ocr endpoint via the default method when /ocr/stream returns 404. Uses a separate HttpClient with 5-minute request timeout for streaming. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,6 +1,10 @@
|
||||
package org.raddatz.familienarchiv.service;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.raddatz.familienarchiv.model.ScriptType;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
@@ -10,18 +14,34 @@ import org.springframework.http.client.JdkClientHttpRequestFactory;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.client.RestClient;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.URI;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpRequest;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
public class RestClientOcrClient implements OcrClient, OcrHealthClient {
|
||||
|
||||
private static final ObjectMapper NDJSON_MAPPER = new ObjectMapper()
|
||||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true);
|
||||
|
||||
private final RestClient restClient;
|
||||
private final HttpClient streamingHttpClient;
|
||||
private final String baseUrl;
|
||||
|
||||
public RestClientOcrClient(@Value("${app.ocr.base-url:http://ocr-service:8000}") String baseUrl) {
|
||||
this.baseUrl = baseUrl;
|
||||
|
||||
HttpClient httpClient = HttpClient.newBuilder()
|
||||
.version(HttpClient.Version.HTTP_1_1)
|
||||
.connectTimeout(Duration.ofSeconds(10))
|
||||
@@ -33,6 +53,11 @@ public class RestClientOcrClient implements OcrClient, OcrHealthClient {
|
||||
.baseUrl(baseUrl)
|
||||
.requestFactory(requestFactory)
|
||||
.build();
|
||||
|
||||
this.streamingHttpClient = HttpClient.newBuilder()
|
||||
.version(HttpClient.Version.HTTP_1_1)
|
||||
.connectTimeout(Duration.ofSeconds(10))
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -70,6 +95,82 @@ public class RestClientOcrClient implements OcrClient, OcrHealthClient {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void streamBlocks(String pdfUrl, ScriptType scriptType, Consumer<OcrStreamEvent> handler) {
|
||||
String body;
|
||||
try {
|
||||
body = NDJSON_MAPPER.writeValueAsString(Map.of(
|
||||
"pdfUrl", pdfUrl,
|
||||
"scriptType", scriptType.name(),
|
||||
"language", "de"));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Failed to serialize OCR request", e);
|
||||
}
|
||||
|
||||
HttpRequest request = HttpRequest.newBuilder()
|
||||
.uri(URI.create(baseUrl + "/ocr/stream"))
|
||||
.header("Content-Type", "application/json")
|
||||
.POST(HttpRequest.BodyPublishers.ofString(body))
|
||||
.timeout(Duration.ofMinutes(5))
|
||||
.build();
|
||||
|
||||
try {
|
||||
HttpResponse<InputStream> response = streamingHttpClient.send(
|
||||
request, HttpResponse.BodyHandlers.ofInputStream());
|
||||
|
||||
if (response.statusCode() == 404) {
|
||||
log.info("OCR service does not support /ocr/stream (404), falling back to /ocr");
|
||||
OcrClient.super.streamBlocks(pdfUrl, scriptType, handler);
|
||||
return;
|
||||
}
|
||||
|
||||
try (InputStream inputStream = response.body()) {
|
||||
parseNdjsonStream(inputStream, handler);
|
||||
}
|
||||
} catch (IOException | InterruptedException e) {
|
||||
if (e instanceof InterruptedException) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
throw new RuntimeException("NDJSON stream failed: " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
static void parseNdjsonStream(InputStream inputStream, Consumer<OcrStreamEvent> handler) {
|
||||
try (BufferedReader reader = new BufferedReader(
|
||||
new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
|
||||
String line;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
if (line.isBlank()) continue;
|
||||
|
||||
JsonNode node = NDJSON_MAPPER.readTree(line);
|
||||
String type = node.path("type").asText();
|
||||
|
||||
switch (type) {
|
||||
case "start" -> handler.accept(
|
||||
new OcrStreamEvent.Start(node.path("totalPages").asInt()));
|
||||
case "page" -> {
|
||||
int pageNumber = node.path("pageNumber").asInt();
|
||||
List<OcrBlockResult> blocks = NDJSON_MAPPER.convertValue(
|
||||
node.path("blocks"),
|
||||
new TypeReference<>() {});
|
||||
handler.accept(new OcrStreamEvent.Page(pageNumber, blocks));
|
||||
}
|
||||
case "error" -> handler.accept(
|
||||
new OcrStreamEvent.Error(
|
||||
node.path("pageNumber").asInt(),
|
||||
node.path("message").asText()));
|
||||
case "done" -> handler.accept(
|
||||
new OcrStreamEvent.Done(
|
||||
node.path("totalBlocks").asInt(),
|
||||
node.path("skippedPages").asInt()));
|
||||
default -> log.debug("Ignoring unknown NDJSON event type: {}", type);
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Failed to parse NDJSON stream: " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
record OcrBlockJson(
|
||||
@JsonProperty("pageNumber") int pageNumber,
|
||||
double x,
|
||||
|
||||
@@ -0,0 +1,117 @@
|
||||
package org.raddatz.familienarchiv.service;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.InputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class RestClientOcrClientStreamTest {
|
||||
|
||||
@Test
|
||||
void parseNdjsonStream_dispatchesStartPageDoneInOrder() {
|
||||
String ndjson = """
|
||||
{"type":"start","totalPages":2}
|
||||
{"type":"page","pageNumber":0,"blocks":[{"pageNumber":0,"x":0.1,"y":0.2,"width":0.8,"height":0.1,"polygon":null,"text":"Line 1"}]}
|
||||
{"type":"page","pageNumber":1,"blocks":[{"pageNumber":1,"x":0.1,"y":0.3,"width":0.8,"height":0.1,"polygon":null,"text":"Line 2"}]}
|
||||
{"type":"done","totalBlocks":2,"skippedPages":0}
|
||||
""";
|
||||
InputStream stream = new ByteArrayInputStream(ndjson.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
List<OcrStreamEvent> events = new ArrayList<>();
|
||||
RestClientOcrClient.parseNdjsonStream(stream, events::add);
|
||||
|
||||
assertThat(events).hasSize(4);
|
||||
assertThat(events.get(0)).isInstanceOf(OcrStreamEvent.Start.class);
|
||||
assertThat(((OcrStreamEvent.Start) events.get(0)).totalPages()).isEqualTo(2);
|
||||
|
||||
assertThat(events.get(1)).isInstanceOf(OcrStreamEvent.Page.class);
|
||||
var page0 = (OcrStreamEvent.Page) events.get(1);
|
||||
assertThat(page0.pageNumber()).isEqualTo(0);
|
||||
assertThat(page0.blocks()).hasSize(1);
|
||||
assertThat(page0.blocks().get(0).text()).isEqualTo("Line 1");
|
||||
|
||||
assertThat(events.get(2)).isInstanceOf(OcrStreamEvent.Page.class);
|
||||
var page1 = (OcrStreamEvent.Page) events.get(2);
|
||||
assertThat(page1.pageNumber()).isEqualTo(1);
|
||||
|
||||
assertThat(events.get(3)).isInstanceOf(OcrStreamEvent.Done.class);
|
||||
var done = (OcrStreamEvent.Done) events.get(3);
|
||||
assertThat(done.totalBlocks()).isEqualTo(2);
|
||||
assertThat(done.skippedPages()).isEqualTo(0);
|
||||
}
|
||||
|
||||
@Test
|
||||
void parseNdjsonStream_parsesErrorEvents() {
|
||||
String ndjson = """
|
||||
{"type":"start","totalPages":3}
|
||||
{"type":"page","pageNumber":0,"blocks":[]}
|
||||
{"type":"error","pageNumber":1,"message":"OCR processing failed on page 1"}
|
||||
{"type":"page","pageNumber":2,"blocks":[]}
|
||||
{"type":"done","totalBlocks":0,"skippedPages":1}
|
||||
""";
|
||||
InputStream stream = new ByteArrayInputStream(ndjson.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
List<OcrStreamEvent> events = new ArrayList<>();
|
||||
RestClientOcrClient.parseNdjsonStream(stream, events::add);
|
||||
|
||||
assertThat(events).hasSize(5);
|
||||
assertThat(events.get(2)).isInstanceOf(OcrStreamEvent.Error.class);
|
||||
var error = (OcrStreamEvent.Error) events.get(2);
|
||||
assertThat(error.pageNumber()).isEqualTo(1);
|
||||
assertThat(error.message()).contains("OCR processing failed");
|
||||
}
|
||||
|
||||
@Test
|
||||
void parseNdjsonStream_skipsBlankLines() {
|
||||
String ndjson = """
|
||||
{"type":"start","totalPages":1}
|
||||
|
||||
{"type":"page","pageNumber":0,"blocks":[]}
|
||||
|
||||
{"type":"done","totalBlocks":0,"skippedPages":0}
|
||||
""";
|
||||
InputStream stream = new ByteArrayInputStream(ndjson.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
List<OcrStreamEvent> events = new ArrayList<>();
|
||||
RestClientOcrClient.parseNdjsonStream(stream, events::add);
|
||||
|
||||
assertThat(events).hasSize(3);
|
||||
}
|
||||
|
||||
@Test
|
||||
void parseNdjsonStream_ignoresUnknownEventTypes() {
|
||||
String ndjson = """
|
||||
{"type":"start","totalPages":1}
|
||||
{"type":"unknown","foo":"bar"}
|
||||
{"type":"done","totalBlocks":0,"skippedPages":0}
|
||||
""";
|
||||
InputStream stream = new ByteArrayInputStream(ndjson.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
List<OcrStreamEvent> events = new ArrayList<>();
|
||||
RestClientOcrClient.parseNdjsonStream(stream, events::add);
|
||||
|
||||
assertThat(events).hasSize(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
void parseNdjsonStream_parsesPageWithPolygon() {
|
||||
String ndjson = """
|
||||
{"type":"start","totalPages":1}
|
||||
{"type":"page","pageNumber":0,"blocks":[{"pageNumber":0,"x":0.1,"y":0.2,"width":0.8,"height":0.1,"polygon":[[0.1,0.2],[0.9,0.2],[0.9,0.3],[0.1,0.3]],"text":"With polygon"}]}
|
||||
{"type":"done","totalBlocks":1,"skippedPages":0}
|
||||
""";
|
||||
InputStream stream = new ByteArrayInputStream(ndjson.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
List<OcrStreamEvent> events = new ArrayList<>();
|
||||
RestClientOcrClient.parseNdjsonStream(stream, events::add);
|
||||
|
||||
var page = (OcrStreamEvent.Page) events.get(1);
|
||||
assertThat(page.blocks().get(0).polygon()).hasSize(4);
|
||||
assertThat(page.blocks().get(0).text()).isEqualTo("With polygon");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user