From: Kai Moritz Date: Tue, 8 Apr 2025 05:20:27 +0000 (+0200) Subject: Refactor: Das Durchsetzen des Header-Prefix ist Aufgabe des Controllers X-Git-Tag: consumer/nodlt--2026-03-20~4 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=124f2141b08605250e7a360cf8d0b2a607b79bdb;p=demos%2Fkafka%2Ftraining Refactor: Das Durchsetzen des Header-Prefix ist Aufgabe des Controllers --- diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index d0c18671..6212e57b 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -26,7 +26,6 @@ public class ApplicationConfiguration new DeadLetterConsumer( properties.getClientId(), properties.getConsumer().getTopic(), - properties.getConsumer().getHeaderPrefix(), kafkaConsumer, () -> applicationContext.close()); } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 626db844..38d1b9c4 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -39,9 +39,6 @@ public class ApplicationProperties @NotNull @NotEmpty private String topic; - @NotNull - @NotEmpty - private String headerPrefix; } @Validated @@ -52,5 +49,8 @@ public class ApplicationProperties @NotNull @NotEmpty private MediaType mediaType; + @NotNull + @NotEmpty + private String headerPrefix; } } diff --git a/src/main/java/de/juplo/kafka/DeadLetterConsumer.java b/src/main/java/de/juplo/kafka/DeadLetterConsumer.java index a19cd3f5..b7b9b7a0 100644 --- a/src/main/java/de/juplo/kafka/DeadLetterConsumer.java +++ b/src/main/java/de/juplo/kafka/DeadLetterConsumer.java @@ -24,13 +24,8 @@ import java.util.stream.IntStream; @Slf4j public class DeadLetterConsumer implements Runnable { - public final static String KEY = "KEY"; - public final static String TIMESTAMP = "TIMESTAMP"; - - private final String id; private final String topic; - private final String headerPrefix; private final int numPartitions; private final Queue[] pendingFetchRequests; private final FetchRequest[] currentFetchRequest; @@ -44,13 +39,11 @@ public class DeadLetterConsumer implements Runnable public DeadLetterConsumer( String clientId, String topic, - String headerPrefix, Consumer consumer, Runnable closeCallback) { this.id = clientId; this.topic = topic; - this.headerPrefix = headerPrefix; this.consumer = consumer; numPartitions = consumer.partitionsFor(topic).size(); @@ -235,11 +228,6 @@ public class DeadLetterConsumer implements Runnable return Mono.fromFuture(future); } - String prefixed(String headerName) - { - return headerPrefix + headerName; - } - public void shutdown() throws InterruptedException { log.info("{} - Requesting shutdown", id); diff --git a/src/main/java/de/juplo/kafka/DeadLetterController.java b/src/main/java/de/juplo/kafka/DeadLetterController.java index 6d84e84f..a0e0b1cb 100644 --- a/src/main/java/de/juplo/kafka/DeadLetterController.java +++ b/src/main/java/de/juplo/kafka/DeadLetterController.java @@ -15,8 +15,14 @@ import java.nio.charset.StandardCharsets; @RestController public class DeadLetterController { + public final static String KEY = "KEY"; + public final static String TIMESTAMP = "TIMESTAMP"; + + private final DeadLetterConsumer deadLetterConsumer; private final MediaType mediaType; + private final String headerPrefix; + public DeadLetterController( @@ -25,6 +31,7 @@ public class DeadLetterController { this.deadLetterConsumer = deadLetterConsumer; this.mediaType = properties.getController().getMediaType(); + this.headerPrefix = properties.getController().getHeaderPrefix(); } @@ -39,14 +46,20 @@ public class DeadLetterController .ok() .contentType(mediaType) .header( - deadLetterConsumer.prefixed(DeadLetterConsumer.KEY), + prefixed(KEY), UriUtils.encodePathSegment(new String(record.key()), StandardCharsets.UTF_8)) .header( - deadLetterConsumer.prefixed(DeadLetterConsumer.TIMESTAMP), + prefixed(TIMESTAMP), Long.toString(record.timestamp())) .body(record.value())); } + String prefixed(String headerName) + { + return headerPrefix + headerName; + } + + @ResponseStatus(value= HttpStatus.NOT_FOUND) @ExceptionHandler(OffsetOutOfRangeException.class) public void notFound(OffsetOutOfRangeException e) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 652cc9c1..8afc71e6 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -4,8 +4,8 @@ juplo: consumer: group-id: nodlt topic: test - header-prefix: X-RECORD- controller: + header-prefix: X-RECORD- media-type: application/json management: endpoint: diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 7549f0a1..6c07c061 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -43,7 +43,7 @@ import static org.assertj.core.api.Assertions.assertThat; properties = { "juplo.bootstrap-server=${spring.embedded.kafka.brokers}", "juplo.consumer.topic=" + TOPIC, - "juplo.consumer.header-prefix=" + HEADER_PREFIX, + "juplo.controller.header-prefix=" + HEADER_PREFIX, "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.ByteArraySerializer", "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer", "logging.level.de.juplo.kafka=TRACE", @@ -105,9 +105,9 @@ public class ApplicationTests assertThat(response.getStatusCode()) .isEqualTo(HttpStatusCode.valueOf(HttpStatus.OK.value())); - assertThat(response.getHeaders().getValuesAsList(HEADER_PREFIX + DeadLetterConsumer.KEY)) + assertThat(response.getHeaders().getValuesAsList(HEADER_PREFIX + DeadLetterController.KEY)) .containsExactly(key); - assertThat(response.getHeaders().getValuesAsList(HEADER_PREFIX + DeadLetterConsumer.TIMESTAMP)) + assertThat(response.getHeaders().getValuesAsList(HEADER_PREFIX + DeadLetterController.TIMESTAMP)) .containsExactly(Long.toString(recordMetadata.timestamp())); assertThat(response.getBody()) .isEqualTo(value);