ROT: Erwartungen für Standard-Header definiert
authorKai Moritz <kai@juplo.de>
Mon, 7 Apr 2025 20:15:12 +0000 (22:15 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 21 May 2025 18:14:13 +0000 (20:14 +0200)
src/main/java/de/juplo/kafka/DeadLetterConsumer.java
src/test/java/de/juplo/kafka/ApplicationTests.java

index 26a1e81..56d25ee 100644 (file)
@@ -24,6 +24,10 @@ import java.util.stream.IntStream;
 @Slf4j
 public class DeadLetterConsumer implements Runnable
 {
+  public final static String KEY = "KEY";
+  public final static String TIMESTAMP = "TIMESTAMP";
+
+
   private final String id;
   private final String topic;
   private final int numPartitions;
@@ -228,6 +232,11 @@ public class DeadLetterConsumer implements Runnable
     return Mono.fromFuture(future);
   }
 
+  String prefixed(String headerName)
+  {
+    return headerName;
+  }
+
   public void shutdown() throws InterruptedException
   {
     log.info("{} - Requesting shutdown", id);
index def547c..4c7c8b3 100644 (file)
@@ -26,6 +26,7 @@ import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
@@ -72,6 +73,14 @@ public class ApplicationTests
       recordMetadata.offset());
     assertThat(response.getStatusCode())
       .isEqualTo(HttpStatusCode.valueOf(HttpStatus.OK.value()));
+    assertThat(response.getHeaders())
+      .containsEntry(
+        deadLetterConsumer.prefixed(DeadLetterConsumer.KEY),
+        List.of(key));
+    assertThat(response.getHeaders())
+      .containsEntry(
+        deadLetterConsumer.prefixed(DeadLetterConsumer.TIMESTAMP),
+        List.of(Long.toString(recordMetadata.timestamp())));
     assertThat(response.getBody())
       .isEqualTo(value);
   }
@@ -123,6 +132,8 @@ public class ApplicationTests
   AdminClient adminClient;
   @Autowired
   TestRestTemplate restTemplate;
+  @Autowired
+  DeadLetterConsumer deadLetterConsumer;
 
   final long[] currentOffsets = new long[NUM_PARTITIONS];