]> juplo.de Git - demos/kafka/training/commitdiff
ROT: Erwartungen für Standard-Header definiert
authorKai Moritz <kai@juplo.de>
Mon, 7 Apr 2025 20:15:12 +0000 (22:15 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 22 Mar 2026 20:47:38 +0000 (21:47 +0100)
src/main/java/de/juplo/kafka/DeadLetterConsumer.java
src/test/java/de/juplo/kafka/ApplicationTests.java

index 26a1e8139231b0d6d59cce74598beed75d917613..56d25ee80c0d313efca69b692e8455eb14924f78 100644 (file)
@@ -24,6 +24,10 @@ import java.util.stream.IntStream;
 @Slf4j
 public class DeadLetterConsumer implements Runnable
 {
+  public final static String KEY = "KEY";
+  public final static String TIMESTAMP = "TIMESTAMP";
+
+
   private final String id;
   private final String topic;
   private final int numPartitions;
@@ -228,6 +232,11 @@ public class DeadLetterConsumer implements Runnable
     return Mono.fromFuture(future);
   }
 
+  String prefixed(String headerName)
+  {
+    return headerName;
+  }
+
   public void shutdown() throws InterruptedException
   {
     log.info("{} - Requesting shutdown", id);
index 7671132eba273d806b7a3ecf84a61d460a1e1b21..5af61b6dc1e214a31ae4b58b969f5c081f6502d7 100644 (file)
@@ -28,6 +28,7 @@ import org.springframework.kafka.test.context.EmbeddedKafka;
 import org.springframework.test.annotation.DirtiesContext;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
@@ -76,6 +77,14 @@ public class ApplicationTests
       recordMetadata.offset());
     assertThat(response.getStatusCode())
       .isEqualTo(HttpStatusCode.valueOf(HttpStatus.OK.value()));
+    assertThat(response.getHeaders().toSingleValueMap())
+      .containsEntry(
+        deadLetterConsumer.prefixed(DeadLetterConsumer.KEY),
+        key);
+    assertThat(response.getHeaders().toSingleValueMap())
+      .containsEntry(
+        deadLetterConsumer.prefixed(DeadLetterConsumer.TIMESTAMP),
+        Long.toString(recordMetadata.timestamp()));
     assertThat(response.getBody())
       .isEqualTo(value);
   }
@@ -127,6 +136,8 @@ public class ApplicationTests
   AdminClient adminClient;
   @Autowired
   TestRestTemplate restTemplate;
+  @Autowired
+  DeadLetterConsumer deadLetterConsumer;
 
   final long[] currentOffsets = new long[NUM_PARTITIONS];