Refactor: Das Durchsetzen des Header-Prefix ist Aufgabe des Controllers
authorKai Moritz <kai@juplo.de>
Tue, 8 Apr 2025 05:20:27 +0000 (07:20 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 21 May 2025 18:14:13 +0000 (20:14 +0200)
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/DeadLetterConsumer.java
src/main/java/de/juplo/kafka/DeadLetterController.java
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ApplicationTests.java

index 3feee95..79a062c 100644 (file)
@@ -26,7 +26,6 @@ public class ApplicationConfiguration
       new DeadLetterConsumer(
         properties.getClientId(),
         properties.getConsumer().getTopic(),
-        properties.getConsumer().getHeaderPrefix(),
         kafkaConsumer,
         () -> applicationContext.close());
   }
index 626db84..38d1b9c 100644 (file)
@@ -39,9 +39,6 @@ public class ApplicationProperties
     @NotNull
     @NotEmpty
     private String topic;
-    @NotNull
-    @NotEmpty
-    private String headerPrefix;
   }
 
   @Validated
@@ -52,5 +49,8 @@ public class ApplicationProperties
     @NotNull
     @NotEmpty
     private MediaType mediaType;
+    @NotNull
+    @NotEmpty
+    private String headerPrefix;
   }
 }
index a19cd3f..b7b9b7a 100644 (file)
@@ -24,13 +24,8 @@ import java.util.stream.IntStream;
 @Slf4j
 public class DeadLetterConsumer implements Runnable
 {
-  public final static String KEY = "KEY";
-  public final static String TIMESTAMP = "TIMESTAMP";
-
-
   private final String id;
   private final String topic;
-  private final String headerPrefix;
   private final int numPartitions;
   private final Queue<FetchRequest>[] pendingFetchRequests;
   private final FetchRequest[] currentFetchRequest;
@@ -44,13 +39,11 @@ public class DeadLetterConsumer implements Runnable
   public DeadLetterConsumer(
     String clientId,
     String topic,
-    String headerPrefix,
     Consumer<byte[], byte[]> consumer,
     Runnable closeCallback)
   {
     this.id = clientId;
     this.topic = topic;
-    this.headerPrefix = headerPrefix;
     this.consumer = consumer;
 
     numPartitions = consumer.partitionsFor(topic).size();
@@ -235,11 +228,6 @@ public class DeadLetterConsumer implements Runnable
     return Mono.fromFuture(future);
   }
 
-  String prefixed(String headerName)
-  {
-    return headerPrefix + headerName;
-  }
-
   public void shutdown() throws InterruptedException
   {
     log.info("{} - Requesting shutdown", id);
index 6d84e84..a0e0b1c 100644 (file)
@@ -15,8 +15,14 @@ import java.nio.charset.StandardCharsets;
 @RestController
 public class DeadLetterController
 {
+  public final static String KEY = "KEY";
+  public final static String TIMESTAMP = "TIMESTAMP";
+
+
   private final DeadLetterConsumer deadLetterConsumer;
   private final MediaType mediaType;
+  private final String headerPrefix;
+
 
 
   public DeadLetterController(
@@ -25,6 +31,7 @@ public class DeadLetterController
   {
     this.deadLetterConsumer = deadLetterConsumer;
     this.mediaType = properties.getController().getMediaType();
+    this.headerPrefix = properties.getController().getHeaderPrefix();
   }
 
 
@@ -39,14 +46,20 @@ public class DeadLetterController
         .ok()
         .contentType(mediaType)
         .header(
-          deadLetterConsumer.prefixed(DeadLetterConsumer.KEY),
+          prefixed(KEY),
           UriUtils.encodePathSegment(new String(record.key()), StandardCharsets.UTF_8))
         .header(
-          deadLetterConsumer.prefixed(DeadLetterConsumer.TIMESTAMP),
+          prefixed(TIMESTAMP),
           Long.toString(record.timestamp()))
         .body(record.value()));
   }
 
+  String prefixed(String headerName)
+  {
+    return headerPrefix + headerName;
+  }
+
+
   @ResponseStatus(value= HttpStatus.NOT_FOUND)
   @ExceptionHandler(OffsetOutOfRangeException.class)
   public void notFound(OffsetOutOfRangeException e)
index 652cc9c..8afc71e 100644 (file)
@@ -4,8 +4,8 @@ juplo:
   consumer:
     group-id: nodlt
     topic: test
-    header-prefix: X-RECORD-
   controller:
+    header-prefix: X-RECORD-
     media-type: application/json
 management:
   endpoint:
index f26ee16..1545131 100644 (file)
@@ -43,7 +43,7 @@ import static org.assertj.core.api.Assertions.assertThat;
   properties = {
     "juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
     "juplo.consumer.topic=" + TOPIC,
-    "juplo.consumer.header-prefix=" + HEADER_PREFIX,
+    "juplo.controller.header-prefix=" + HEADER_PREFIX,
     "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
     "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
     "logging.level.de.juplo.kafka=TRACE",
@@ -107,11 +107,11 @@ public class ApplicationTests
       .isEqualTo(HttpStatusCode.valueOf(HttpStatus.OK.value()));
     assertThat(response.getHeaders())
       .containsEntry(
-        HEADER_PREFIX + DeadLetterConsumer.KEY,
+        HEADER_PREFIX + DeadLetterController.KEY,
         List.of(key));
     assertThat(response.getHeaders())
       .containsEntry(
-        HEADER_PREFIX + DeadLetterConsumer.TIMESTAMP,
+        HEADER_PREFIX + DeadLetterController.TIMESTAMP,
         List.of(Long.toString(recordMetadata.timestamp())));
     assertThat(response.getBody())
       .isEqualTo(value);