GRÜN: Verhalten für Standard-Header korrigiert
authorKai Moritz <kai@juplo.de>
Mon, 7 Apr 2025 20:16:37 +0000 (22:16 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 21 May 2025 18:14:13 +0000 (20:14 +0200)
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/DeadLetterConsumer.java
src/main/java/de/juplo/kafka/DeadLetterController.java
src/main/java/de/juplo/kafka/FetchRequest.java
src/main/resources/application.yml

index 3284ca6..8b11dad 100644 (file)
@@ -24,7 +24,8 @@ public class ApplicationConfiguration
     return
       new DeadLetterConsumer(
         properties.getClientId(),
-        properties.getConsumerProperties().getTopic(),
+        properties.getConsumer().getTopic(),
+        properties.getConsumer().getHeaderPrefix(),
         kafkaConsumer,
         () -> applicationContext.close());
   }
@@ -36,7 +37,7 @@ public class ApplicationConfiguration
 
     props.put("bootstrap.servers", properties.getBootstrapServer());
     props.put("client.id", properties.getClientId());
-    props.put("group.id", properties.getConsumerProperties().getGroupId());
+    props.put("group.id", properties.getConsumer().getGroupId());
     props.put("key.deserializer", StringDeserializer.class.getName());
     props.put("value.deserializer", StringDeserializer.class.getName());
     props.put("auto.offset.reset", "none");
index 9d2511b..ee3d107 100644 (file)
@@ -25,12 +25,6 @@ public class ApplicationProperties
   private ConsumerProperties consumer;
 
 
-  public ConsumerProperties getConsumerProperties()
-  {
-    return consumer;
-  }
-
-
   @Validated
   @Getter
   @Setter
@@ -42,5 +36,8 @@ public class ApplicationProperties
     @NotNull
     @NotEmpty
     private String topic;
+    @NotNull
+    @NotEmpty
+    private String headerPrefix;
   }
 }
index 56d25ee..b8a32c5 100644 (file)
@@ -30,6 +30,7 @@ public class DeadLetterConsumer implements Runnable
 
   private final String id;
   private final String topic;
+  private final String headerPrefix;
   private final int numPartitions;
   private final Queue<FetchRequest>[] pendingFetchRequests;
   private final FetchRequest[] currentFetchRequest;
@@ -43,11 +44,13 @@ public class DeadLetterConsumer implements Runnable
   public DeadLetterConsumer(
     String clientId,
     String topic,
+    String headerPrefix,
     Consumer<String, String> consumer,
     Runnable closeCallback)
   {
     this.id = clientId;
     this.topic = topic;
+    this.headerPrefix = headerPrefix;
     this.consumer = consumer;
 
     numPartitions = consumer.partitionsFor(topic).size();
@@ -103,7 +106,7 @@ public class DeadLetterConsumer implements Runnable
 
               FetchRequest fetchRequest = currentFetchRequest[record.partition()];
 
-              fetchRequest.future().complete(record.value());
+              fetchRequest.future().complete(record);
               schedulePendingFetchRequest(record.partition()).ifPresentOrElse(
                 (nextFetchRequest) -> scheduleFetchRequest(nextFetchRequest),
                 () ->
@@ -206,14 +209,14 @@ public class DeadLetterConsumer implements Runnable
     currentFetchRequest[fetchRequest.partition().partition()] = fetchRequest;
     consumer.seek(fetchRequest.partition(), fetchRequest.offset());
   }
-  Mono<String> requestRecord(int partition, long offset)
+  Mono<ConsumerRecord<String, String>> requestRecord(int partition, long offset)
   {
     if (partition >= numPartitions || partition < 0)
     {
       throw new NonExistentPartitionException(topic, partition);
     }
 
-    CompletableFuture<String> future = new CompletableFuture<>();
+    CompletableFuture<ConsumerRecord<String, String>> future = new CompletableFuture<>();
 
     FetchRequest fetchRequest = new FetchRequest(
       new TopicPartition(topic, partition),
@@ -234,7 +237,7 @@ public class DeadLetterConsumer implements Runnable
 
   String prefixed(String headerName)
   {
-    return headerName;
+    return headerPrefix + headerName;
   }
 
   public void shutdown() throws InterruptedException
index ba59c30..64553ea 100644 (file)
@@ -4,9 +4,13 @@ import de.juplo.kafka.exceptions.NonExistentPartitionException;
 import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.*;
+import org.springframework.web.util.UriUtils;
 import reactor.core.publisher.Mono;
 
+import java.nio.charset.StandardCharsets;
+
 
 @RestController
 public class DeadLetterController
@@ -16,11 +20,21 @@ public class DeadLetterController
 
 
   @GetMapping(path = "/{partition}/{offset}")
-  public Mono<String> recordAtOffset(
+  public Mono<ResponseEntity<String>> recordAtOffset(
     @PathVariable int partition,
     @PathVariable long offset)
   {
-    return deadLetterConsumer.requestRecord(partition, offset);
+    return deadLetterConsumer
+      .requestRecord(partition, offset)
+      .map(record -> ResponseEntity
+        .ok()
+        .header(
+          deadLetterConsumer.prefixed(DeadLetterConsumer.KEY),
+          UriUtils.encodePathSegment(record.key(), StandardCharsets.UTF_8))
+        .header(
+          deadLetterConsumer.prefixed(DeadLetterConsumer.TIMESTAMP),
+          Long.toString(record.timestamp()))
+        .body(record.value()));
   }
 
   @ResponseStatus(value= HttpStatus.NOT_FOUND)
index 394e2c9..ff186b7 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 
 import java.util.concurrent.CompletableFuture;
@@ -8,7 +9,7 @@ import java.util.concurrent.CompletableFuture;
 public record FetchRequest(
   TopicPartition partition,
   long offset,
-  CompletableFuture<String> future)
+  CompletableFuture<ConsumerRecord<String, String>> future)
 {
   @Override
   public String toString()
index 0c4e07f..31bec84 100644 (file)
@@ -4,6 +4,7 @@ juplo:
   consumer:
     group-id: nodlt
     topic: test
+    header-prefix: X-RECORD-
 management:
   endpoint:
     shutdown: