From b2407a2ab932f2b5b8706e044bc88c8cf65be50d Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 8 Apr 2025 07:20:27 +0200 Subject: [PATCH] Refactor: Das Durchsetzen des Header-Prefix ist Aufgabe des Controllers --- .../juplo/kafka/ApplicationConfiguration.java | 1 - .../de/juplo/kafka/ApplicationProperties.java | 6 +++--- .../java/de/juplo/kafka/DeadLetterConsumer.java | 12 ------------ .../de/juplo/kafka/DeadLetterController.java | 17 +++++++++++++++-- src/main/resources/application.yml | 2 +- .../java/de/juplo/kafka/ApplicationTests.java | 6 +++--- 6 files changed, 22 insertions(+), 22 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 3feee95..79a062c 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -26,7 +26,6 @@ public class ApplicationConfiguration new DeadLetterConsumer( properties.getClientId(), properties.getConsumer().getTopic(), - properties.getConsumer().getHeaderPrefix(), kafkaConsumer, () -> applicationContext.close()); } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 626db84..38d1b9c 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -39,9 +39,6 @@ public class ApplicationProperties @NotNull @NotEmpty private String topic; - @NotNull - @NotEmpty - private String headerPrefix; } @Validated @@ -52,5 +49,8 @@ public class ApplicationProperties @NotNull @NotEmpty private MediaType mediaType; + @NotNull + @NotEmpty + private String headerPrefix; } } diff --git a/src/main/java/de/juplo/kafka/DeadLetterConsumer.java b/src/main/java/de/juplo/kafka/DeadLetterConsumer.java index a19cd3f..b7b9b7a 100644 --- a/src/main/java/de/juplo/kafka/DeadLetterConsumer.java +++ b/src/main/java/de/juplo/kafka/DeadLetterConsumer.java @@ -24,13 +24,8 @@ import java.util.stream.IntStream; @Slf4j public class DeadLetterConsumer implements Runnable { - public final static String KEY = "KEY"; - public final static String TIMESTAMP = "TIMESTAMP"; - - private final String id; private final String topic; - private final String headerPrefix; private final int numPartitions; private final Queue[] pendingFetchRequests; private final FetchRequest[] currentFetchRequest; @@ -44,13 +39,11 @@ public class DeadLetterConsumer implements Runnable public DeadLetterConsumer( String clientId, String topic, - String headerPrefix, Consumer consumer, Runnable closeCallback) { this.id = clientId; this.topic = topic; - this.headerPrefix = headerPrefix; this.consumer = consumer; numPartitions = consumer.partitionsFor(topic).size(); @@ -235,11 +228,6 @@ public class DeadLetterConsumer implements Runnable return Mono.fromFuture(future); } - String prefixed(String headerName) - { - return headerPrefix + headerName; - } - public void shutdown() throws InterruptedException { log.info("{} - Requesting shutdown", id); diff --git a/src/main/java/de/juplo/kafka/DeadLetterController.java b/src/main/java/de/juplo/kafka/DeadLetterController.java index 6d84e84..a0e0b1c 100644 --- a/src/main/java/de/juplo/kafka/DeadLetterController.java +++ b/src/main/java/de/juplo/kafka/DeadLetterController.java @@ -15,8 +15,14 @@ import java.nio.charset.StandardCharsets; @RestController public class DeadLetterController { + public final static String KEY = "KEY"; + public final static String TIMESTAMP = "TIMESTAMP"; + + private final DeadLetterConsumer deadLetterConsumer; private final MediaType mediaType; + private final String headerPrefix; + public DeadLetterController( @@ -25,6 +31,7 @@ public class DeadLetterController { this.deadLetterConsumer = deadLetterConsumer; this.mediaType = properties.getController().getMediaType(); + this.headerPrefix = properties.getController().getHeaderPrefix(); } @@ -39,14 +46,20 @@ public class DeadLetterController .ok() .contentType(mediaType) .header( - deadLetterConsumer.prefixed(DeadLetterConsumer.KEY), + prefixed(KEY), UriUtils.encodePathSegment(new String(record.key()), StandardCharsets.UTF_8)) .header( - deadLetterConsumer.prefixed(DeadLetterConsumer.TIMESTAMP), + prefixed(TIMESTAMP), Long.toString(record.timestamp())) .body(record.value())); } + String prefixed(String headerName) + { + return headerPrefix + headerName; + } + + @ResponseStatus(value= HttpStatus.NOT_FOUND) @ExceptionHandler(OffsetOutOfRangeException.class) public void notFound(OffsetOutOfRangeException e) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 652cc9c..8afc71e 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -4,8 +4,8 @@ juplo: consumer: group-id: nodlt topic: test - header-prefix: X-RECORD- controller: + header-prefix: X-RECORD- media-type: application/json management: endpoint: diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index f26ee16..1545131 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -43,7 +43,7 @@ import static org.assertj.core.api.Assertions.assertThat; properties = { "juplo.bootstrap-server=${spring.embedded.kafka.brokers}", "juplo.consumer.topic=" + TOPIC, - "juplo.consumer.header-prefix=" + HEADER_PREFIX, + "juplo.controller.header-prefix=" + HEADER_PREFIX, "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.ByteArraySerializer", "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer", "logging.level.de.juplo.kafka=TRACE", @@ -107,11 +107,11 @@ public class ApplicationTests .isEqualTo(HttpStatusCode.valueOf(HttpStatus.OK.value())); assertThat(response.getHeaders()) .containsEntry( - HEADER_PREFIX + DeadLetterConsumer.KEY, + HEADER_PREFIX + DeadLetterController.KEY, List.of(key)); assertThat(response.getHeaders()) .containsEntry( - HEADER_PREFIX + DeadLetterConsumer.TIMESTAMP, + HEADER_PREFIX + DeadLetterController.TIMESTAMP, List.of(Long.toString(recordMetadata.timestamp()))); assertThat(response.getBody()) .isEqualTo(value); -- 2.20.1