From: Kai Moritz Date: Mon, 7 Apr 2025 20:15:12 +0000 (+0200) Subject: ROT: Erwartungen für Standard-Header definiert X-Git-Tag: consumer/nodlt--2026-03-22--22-01~12 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=b42c3bc12e7d3972f9959557961d838f2226f1df;p=demos%2Fkafka%2Ftraining ROT: Erwartungen für Standard-Header definiert --- diff --git a/src/main/java/de/juplo/kafka/DeadLetterConsumer.java b/src/main/java/de/juplo/kafka/DeadLetterConsumer.java index 26a1e813..56d25ee8 100644 --- a/src/main/java/de/juplo/kafka/DeadLetterConsumer.java +++ b/src/main/java/de/juplo/kafka/DeadLetterConsumer.java @@ -24,6 +24,10 @@ import java.util.stream.IntStream; @Slf4j public class DeadLetterConsumer implements Runnable { + public final static String KEY = "KEY"; + public final static String TIMESTAMP = "TIMESTAMP"; + + private final String id; private final String topic; private final int numPartitions; @@ -228,6 +232,11 @@ public class DeadLetterConsumer implements Runnable return Mono.fromFuture(future); } + String prefixed(String headerName) + { + return headerName; + } + public void shutdown() throws InterruptedException { log.info("{} - Requesting shutdown", id); diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 7671132e..5af61b6d 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -28,6 +28,7 @@ import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.annotation.DirtiesContext; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -76,6 +77,14 @@ public class ApplicationTests recordMetadata.offset()); assertThat(response.getStatusCode()) .isEqualTo(HttpStatusCode.valueOf(HttpStatus.OK.value())); + assertThat(response.getHeaders().toSingleValueMap()) + .containsEntry( + deadLetterConsumer.prefixed(DeadLetterConsumer.KEY), + key); + assertThat(response.getHeaders().toSingleValueMap()) + .containsEntry( + deadLetterConsumer.prefixed(DeadLetterConsumer.TIMESTAMP), + Long.toString(recordMetadata.timestamp())); assertThat(response.getBody()) .isEqualTo(value); } @@ -127,6 +136,8 @@ public class ApplicationTests AdminClient adminClient; @Autowired TestRestTemplate restTemplate; + @Autowired + DeadLetterConsumer deadLetterConsumer; final long[] currentOffsets = new long[NUM_PARTITIONS];