]> juplo.de Git - demos/kafka/training/commitdiff
ROT: Erwartungen für Standard-Header definiert
authorKai Moritz <kai@juplo.de>
Mon, 7 Apr 2025 20:15:12 +0000 (22:15 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 20 Mar 2026 15:21:15 +0000 (16:21 +0100)
src/main/java/de/juplo/kafka/DeadLetterConsumer.java
src/test/java/de/juplo/kafka/ApplicationTests.java

index 26a1e8139231b0d6d59cce74598beed75d917613..56d25ee80c0d313efca69b692e8455eb14924f78 100644 (file)
@@ -24,6 +24,10 @@ import java.util.stream.IntStream;
 @Slf4j
 public class DeadLetterConsumer implements Runnable
 {
+  public final static String KEY = "KEY";
+  public final static String TIMESTAMP = "TIMESTAMP";
+
+
   private final String id;
   private final String topic;
   private final int numPartitions;
@@ -228,6 +232,11 @@ public class DeadLetterConsumer implements Runnable
     return Mono.fromFuture(future);
   }
 
+  String prefixed(String headerName)
+  {
+    return headerName;
+  }
+
   public void shutdown() throws InterruptedException
   {
     log.info("{} - Requesting shutdown", id);
index 0d9b45a906b27d4ca68c943da77ee291fc9e4ded..7b8f3a6a33067f1871091642585b5e7aaa28078e 100644 (file)
@@ -26,6 +26,7 @@ import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
@@ -72,6 +73,10 @@ public class ApplicationTests
       recordMetadata.offset());
     assertThat(response.getStatusCode())
       .isEqualTo(HttpStatusCode.valueOf(HttpStatus.OK.value()));
+    assertThat(response.getHeaders().getValuesAsList(DeadLetterConsumer.KEY))
+      .containsExactly(key);
+    assertThat(response.getHeaders().getValuesAsList(DeadLetterConsumer.TIMESTAMP))
+      .containsExactly(Long.toString(recordMetadata.timestamp()));
     assertThat(response.getBody())
       .isEqualTo(value);
   }
@@ -123,6 +128,8 @@ public class ApplicationTests
   AdminClient adminClient;
   @Autowired
   TestRestTemplate restTemplate;
+  @Autowired
+  DeadLetterConsumer deadLetterConsumer;
 
   final long[] currentOffsets = new long[NUM_PARTITIONS];