From: Kai Moritz Date: Mon, 7 Apr 2025 20:15:12 +0000 (+0200) Subject: ROT: Erwartungen für Standard-Header definiert X-Git-Tag: consumer/nodlt--2026-03-20--19-06~12 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=078da23910fd986dfd0243ddb6110b142391db78;p=demos%2Fkafka%2Ftraining ROT: Erwartungen für Standard-Header definiert --- diff --git a/src/main/java/de/juplo/kafka/DeadLetterConsumer.java b/src/main/java/de/juplo/kafka/DeadLetterConsumer.java index 113fdc2b..9eb2f8be 100644 --- a/src/main/java/de/juplo/kafka/DeadLetterConsumer.java +++ b/src/main/java/de/juplo/kafka/DeadLetterConsumer.java @@ -25,6 +25,10 @@ import java.util.stream.IntStream; @Slf4j public class DeadLetterConsumer implements Runnable, SmartLifecycle { + public final static String KEY = "KEY"; + public final static String TIMESTAMP = "TIMESTAMP"; + + private final String id; private final String topic; private final int numPartitions; @@ -234,6 +238,11 @@ public class DeadLetterConsumer implements Runnable, SmartLifecycle return Mono.fromFuture(future); } + String prefixed(String headerName) + { + return headerName; + } + @Override public boolean isRunning() { diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 0d9b45a9..7b8f3a6a 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -26,6 +26,7 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.test.context.EmbeddedKafka; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -72,6 +73,10 @@ public class ApplicationTests recordMetadata.offset()); assertThat(response.getStatusCode()) .isEqualTo(HttpStatusCode.valueOf(HttpStatus.OK.value())); + assertThat(response.getHeaders().getValuesAsList(DeadLetterConsumer.KEY)) + .containsExactly(key); + assertThat(response.getHeaders().getValuesAsList(DeadLetterConsumer.TIMESTAMP)) + .containsExactly(Long.toString(recordMetadata.timestamp())); assertThat(response.getBody()) .isEqualTo(value); } @@ -123,6 +128,8 @@ public class ApplicationTests AdminClient adminClient; @Autowired TestRestTemplate restTemplate; + @Autowired + DeadLetterConsumer deadLetterConsumer; final long[] currentOffsets = new long[NUM_PARTITIONS];