From: Kai Moritz Date: Mon, 7 Apr 2025 20:15:12 +0000 (+0200) Subject: ROT: Erwartungen für Standard-Header definiert X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=5d9c5d289de7bf14b02ac0c46804c359e6878930;p=demos%2Fkafka%2Ftraining ROT: Erwartungen für Standard-Header definiert --- diff --git a/src/main/java/de/juplo/kafka/DeadLetterConsumer.java b/src/main/java/de/juplo/kafka/DeadLetterConsumer.java index 26a1e81..56d25ee 100644 --- a/src/main/java/de/juplo/kafka/DeadLetterConsumer.java +++ b/src/main/java/de/juplo/kafka/DeadLetterConsumer.java @@ -24,6 +24,10 @@ import java.util.stream.IntStream; @Slf4j public class DeadLetterConsumer implements Runnable { + public final static String KEY = "KEY"; + public final static String TIMESTAMP = "TIMESTAMP"; + + private final String id; private final String topic; private final int numPartitions; @@ -228,6 +232,11 @@ public class DeadLetterConsumer implements Runnable return Mono.fromFuture(future); } + String prefixed(String headerName) + { + return headerName; + } + public void shutdown() throws InterruptedException { log.info("{} - Requesting shutdown", id); diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index def547c..4c7c8b3 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -26,6 +26,7 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.test.context.EmbeddedKafka; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -72,6 +73,14 @@ public class ApplicationTests recordMetadata.offset()); assertThat(response.getStatusCode()) .isEqualTo(HttpStatusCode.valueOf(HttpStatus.OK.value())); + assertThat(response.getHeaders()) + .containsEntry( + deadLetterConsumer.prefixed(DeadLetterConsumer.KEY), + List.of(key)); + assertThat(response.getHeaders()) + .containsEntry( + deadLetterConsumer.prefixed(DeadLetterConsumer.TIMESTAMP), + List.of(Long.toString(recordMetadata.timestamp()))); assertThat(response.getBody()) .isEqualTo(value); } @@ -123,6 +132,8 @@ public class ApplicationTests AdminClient adminClient; @Autowired TestRestTemplate restTemplate; + @Autowired + DeadLetterConsumer deadLetterConsumer; final long[] currentOffsets = new long[NUM_PARTITIONS];