From f08dbb60207ff3af9f275c19bf4fe424bc370323 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 7 Apr 2025 22:16:37 +0200 Subject: [PATCH] =?utf8?q?GR=C3=9CN:=20Verhalten=20f=C3=BCr=20Standard-Hea?= =?utf8?q?der=20korrigiert?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../juplo/kafka/ApplicationConfiguration.java | 5 +++-- .../de/juplo/kafka/ApplicationProperties.java | 9 +++------ .../de/juplo/kafka/DeadLetterConsumer.java | 11 +++++++---- .../de/juplo/kafka/DeadLetterController.java | 18 ++++++++++++++++-- src/main/java/de/juplo/kafka/FetchRequest.java | 3 ++- src/main/resources/application.yml | 1 + 6 files changed, 32 insertions(+), 15 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 3284ca6..8b11dad 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -24,7 +24,8 @@ public class ApplicationConfiguration return new DeadLetterConsumer( properties.getClientId(), - properties.getConsumerProperties().getTopic(), + properties.getConsumer().getTopic(), + properties.getConsumer().getHeaderPrefix(), kafkaConsumer, () -> applicationContext.close()); } @@ -36,7 +37,7 @@ public class ApplicationConfiguration props.put("bootstrap.servers", properties.getBootstrapServer()); props.put("client.id", properties.getClientId()); - props.put("group.id", properties.getConsumerProperties().getGroupId()); + props.put("group.id", properties.getConsumer().getGroupId()); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); props.put("auto.offset.reset", "none"); diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 9d2511b..ee3d107 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -25,12 +25,6 @@ public class ApplicationProperties private ConsumerProperties consumer; - public ConsumerProperties getConsumerProperties() - { - return consumer; - } - - @Validated @Getter @Setter @@ -42,5 +36,8 @@ public class ApplicationProperties @NotNull @NotEmpty private String topic; + @NotNull + @NotEmpty + private String headerPrefix; } } diff --git a/src/main/java/de/juplo/kafka/DeadLetterConsumer.java b/src/main/java/de/juplo/kafka/DeadLetterConsumer.java index 56d25ee..b8a32c5 100644 --- a/src/main/java/de/juplo/kafka/DeadLetterConsumer.java +++ b/src/main/java/de/juplo/kafka/DeadLetterConsumer.java @@ -30,6 +30,7 @@ public class DeadLetterConsumer implements Runnable private final String id; private final String topic; + private final String headerPrefix; private final int numPartitions; private final Queue[] pendingFetchRequests; private final FetchRequest[] currentFetchRequest; @@ -43,11 +44,13 @@ public class DeadLetterConsumer implements Runnable public DeadLetterConsumer( String clientId, String topic, + String headerPrefix, Consumer consumer, Runnable closeCallback) { this.id = clientId; this.topic = topic; + this.headerPrefix = headerPrefix; this.consumer = consumer; numPartitions = consumer.partitionsFor(topic).size(); @@ -103,7 +106,7 @@ public class DeadLetterConsumer implements Runnable FetchRequest fetchRequest = currentFetchRequest[record.partition()]; - fetchRequest.future().complete(record.value()); + fetchRequest.future().complete(record); schedulePendingFetchRequest(record.partition()).ifPresentOrElse( (nextFetchRequest) -> scheduleFetchRequest(nextFetchRequest), () -> @@ -206,14 +209,14 @@ public class DeadLetterConsumer implements Runnable currentFetchRequest[fetchRequest.partition().partition()] = fetchRequest; consumer.seek(fetchRequest.partition(), fetchRequest.offset()); } - Mono requestRecord(int partition, long offset) + Mono> requestRecord(int partition, long offset) { if (partition >= numPartitions || partition < 0) { throw new NonExistentPartitionException(topic, partition); } - CompletableFuture future = new CompletableFuture<>(); + CompletableFuture> future = new CompletableFuture<>(); FetchRequest fetchRequest = new FetchRequest( new TopicPartition(topic, partition), @@ -234,7 +237,7 @@ public class DeadLetterConsumer implements Runnable String prefixed(String headerName) { - return headerName; + return headerPrefix + headerName; } public void shutdown() throws InterruptedException diff --git a/src/main/java/de/juplo/kafka/DeadLetterController.java b/src/main/java/de/juplo/kafka/DeadLetterController.java index ba59c30..64553ea 100644 --- a/src/main/java/de/juplo/kafka/DeadLetterController.java +++ b/src/main/java/de/juplo/kafka/DeadLetterController.java @@ -4,9 +4,13 @@ import de.juplo.kafka.exceptions.NonExistentPartitionException; import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; +import org.springframework.web.util.UriUtils; import reactor.core.publisher.Mono; +import java.nio.charset.StandardCharsets; + @RestController public class DeadLetterController @@ -16,11 +20,21 @@ public class DeadLetterController @GetMapping(path = "/{partition}/{offset}") - public Mono recordAtOffset( + public Mono> recordAtOffset( @PathVariable int partition, @PathVariable long offset) { - return deadLetterConsumer.requestRecord(partition, offset); + return deadLetterConsumer + .requestRecord(partition, offset) + .map(record -> ResponseEntity + .ok() + .header( + deadLetterConsumer.prefixed(DeadLetterConsumer.KEY), + UriUtils.encodePathSegment(record.key(), StandardCharsets.UTF_8)) + .header( + deadLetterConsumer.prefixed(DeadLetterConsumer.TIMESTAMP), + Long.toString(record.timestamp())) + .body(record.value())); } @ResponseStatus(value= HttpStatus.NOT_FOUND) diff --git a/src/main/java/de/juplo/kafka/FetchRequest.java b/src/main/java/de/juplo/kafka/FetchRequest.java index 394e2c9..ff186b7 100644 --- a/src/main/java/de/juplo/kafka/FetchRequest.java +++ b/src/main/java/de/juplo/kafka/FetchRequest.java @@ -1,5 +1,6 @@ package de.juplo.kafka; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import java.util.concurrent.CompletableFuture; @@ -8,7 +9,7 @@ import java.util.concurrent.CompletableFuture; public record FetchRequest( TopicPartition partition, long offset, - CompletableFuture future) + CompletableFuture> future) { @Override public String toString() diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 0c4e07f..31bec84 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -4,6 +4,7 @@ juplo: consumer: group-id: nodlt topic: test + header-prefix: X-RECORD- management: endpoint: shutdown: -- 2.20.1