From: Kai Moritz <kai@juplo.de> Date: Mon, 7 Apr 2025 20:16:37 +0000 (+0200) Subject: GRÜN: Verhalten für Standard-Header korrigiert X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=f08dbb60207ff3af9f275c19bf4fe424bc370323;p=demos%2Fkafka%2Ftraining GRÜN: Verhalten für Standard-Header korrigiert --- diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 3284ca6..8b11dad 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -24,7 +24,8 @@ public class ApplicationConfiguration return new DeadLetterConsumer( properties.getClientId(), - properties.getConsumerProperties().getTopic(), + properties.getConsumer().getTopic(), + properties.getConsumer().getHeaderPrefix(), kafkaConsumer, () -> applicationContext.close()); } @@ -36,7 +37,7 @@ public class ApplicationConfiguration props.put("bootstrap.servers", properties.getBootstrapServer()); props.put("client.id", properties.getClientId()); - props.put("group.id", properties.getConsumerProperties().getGroupId()); + props.put("group.id", properties.getConsumer().getGroupId()); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); props.put("auto.offset.reset", "none"); diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 9d2511b..ee3d107 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -25,12 +25,6 @@ public class ApplicationProperties private ConsumerProperties consumer; - public ConsumerProperties getConsumerProperties() - { - return consumer; - } - - @Validated @Getter @Setter @@ -42,5 +36,8 @@ public class ApplicationProperties @NotNull @NotEmpty private String topic; + @NotNull + @NotEmpty + private String headerPrefix; } } diff --git a/src/main/java/de/juplo/kafka/DeadLetterConsumer.java b/src/main/java/de/juplo/kafka/DeadLetterConsumer.java index 56d25ee..b8a32c5 100644 --- a/src/main/java/de/juplo/kafka/DeadLetterConsumer.java +++ b/src/main/java/de/juplo/kafka/DeadLetterConsumer.java @@ -30,6 +30,7 @@ public class DeadLetterConsumer implements Runnable private final String id; private final String topic; + private final String headerPrefix; private final int numPartitions; private final Queue<FetchRequest>[] pendingFetchRequests; private final FetchRequest[] currentFetchRequest; @@ -43,11 +44,13 @@ public class DeadLetterConsumer implements Runnable public DeadLetterConsumer( String clientId, String topic, + String headerPrefix, Consumer<String, String> consumer, Runnable closeCallback) { this.id = clientId; this.topic = topic; + this.headerPrefix = headerPrefix; this.consumer = consumer; numPartitions = consumer.partitionsFor(topic).size(); @@ -103,7 +106,7 @@ public class DeadLetterConsumer implements Runnable FetchRequest fetchRequest = currentFetchRequest[record.partition()]; - fetchRequest.future().complete(record.value()); + fetchRequest.future().complete(record); schedulePendingFetchRequest(record.partition()).ifPresentOrElse( (nextFetchRequest) -> scheduleFetchRequest(nextFetchRequest), () -> @@ -206,14 +209,14 @@ public class DeadLetterConsumer implements Runnable currentFetchRequest[fetchRequest.partition().partition()] = fetchRequest; consumer.seek(fetchRequest.partition(), fetchRequest.offset()); } - Mono<String> requestRecord(int partition, long offset) + Mono<ConsumerRecord<String, String>> requestRecord(int partition, long offset) { if (partition >= numPartitions || partition < 0) { throw new NonExistentPartitionException(topic, partition); } - CompletableFuture<String> future = new CompletableFuture<>(); + CompletableFuture<ConsumerRecord<String, String>> future = new CompletableFuture<>(); FetchRequest fetchRequest = new FetchRequest( new TopicPartition(topic, partition), @@ -234,7 +237,7 @@ public class DeadLetterConsumer implements Runnable String prefixed(String headerName) { - return headerName; + return headerPrefix + headerName; } public void shutdown() throws InterruptedException diff --git a/src/main/java/de/juplo/kafka/DeadLetterController.java b/src/main/java/de/juplo/kafka/DeadLetterController.java index ba59c30..64553ea 100644 --- a/src/main/java/de/juplo/kafka/DeadLetterController.java +++ b/src/main/java/de/juplo/kafka/DeadLetterController.java @@ -4,9 +4,13 @@ import de.juplo.kafka.exceptions.NonExistentPartitionException; import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; +import org.springframework.web.util.UriUtils; import reactor.core.publisher.Mono; +import java.nio.charset.StandardCharsets; + @RestController public class DeadLetterController @@ -16,11 +20,21 @@ public class DeadLetterController @GetMapping(path = "/{partition}/{offset}") - public Mono<String> recordAtOffset( + public Mono<ResponseEntity<String>> recordAtOffset( @PathVariable int partition, @PathVariable long offset) { - return deadLetterConsumer.requestRecord(partition, offset); + return deadLetterConsumer + .requestRecord(partition, offset) + .map(record -> ResponseEntity + .ok() + .header( + deadLetterConsumer.prefixed(DeadLetterConsumer.KEY), + UriUtils.encodePathSegment(record.key(), StandardCharsets.UTF_8)) + .header( + deadLetterConsumer.prefixed(DeadLetterConsumer.TIMESTAMP), + Long.toString(record.timestamp())) + .body(record.value())); } @ResponseStatus(value= HttpStatus.NOT_FOUND) diff --git a/src/main/java/de/juplo/kafka/FetchRequest.java b/src/main/java/de/juplo/kafka/FetchRequest.java index 394e2c9..ff186b7 100644 --- a/src/main/java/de/juplo/kafka/FetchRequest.java +++ b/src/main/java/de/juplo/kafka/FetchRequest.java @@ -1,5 +1,6 @@ package de.juplo.kafka; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import java.util.concurrent.CompletableFuture; @@ -8,7 +9,7 @@ import java.util.concurrent.CompletableFuture; public record FetchRequest( TopicPartition partition, long offset, - CompletableFuture<String> future) + CompletableFuture<ConsumerRecord<String, String>> future) { @Override public String toString() diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 0c4e07f..31bec84 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -4,6 +4,7 @@ juplo: consumer: group-id: nodlt topic: test + header-prefix: X-RECORD- management: endpoint: shutdown: