From f45bebfeb621d7126be3ab47676a09501da3fc59 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 5 Apr 2025 13:09:01 +0200 Subject: [PATCH] =?utf8?q?noDLT=20implementiert:=20Service,=20=C3=BCber=20?= =?utf8?q?der=20einzelne=20Nachrichten=20wiederherstellt?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- README.sh | 16 +- docker/docker-compose.yml | 9 + pom.xml | 10 +- .../juplo/kafka/ApplicationConfiguration.java | 13 +- .../de/juplo/kafka/ApplicationController.java | 24 +++ .../de/juplo/kafka/ApplicationProperties.java | 6 - .../de/juplo/kafka/DeadLetterConsumer.java | 166 ++++++++++++++---- .../java/de/juplo/kafka/FetchRequest.java | 18 ++ src/main/resources/application.yml | 6 +- 9 files changed, 202 insertions(+), 66 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/ApplicationController.java create mode 100644 src/main/java/de/juplo/kafka/FetchRequest.java diff --git a/README.sh b/README.sh index b46e235..98dcff5 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-SNAPSHOT +IMAGE=juplo/nodlt:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -10,7 +10,7 @@ then fi docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 -docker compose -f docker/docker-compose.yml rm -svf consumer +docker compose -f docker/docker-compose.yml rm -svf nodlt if [[ $(docker image ls -q $IMAGE) == "" || @@ -25,15 +25,11 @@ fi docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 +docker compose -f docker/docker-compose.yml up -d producer consumer nodlt -docker compose -f docker/docker-compose.yml up -d producer -docker compose -f docker/docker-compose.yml up -d consumer +while ! [[ $(http 0:8881/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for nodlt..."; sleep 1; done -sleep 5 -docker compose -f docker/docker-compose.yml stop consumer - -docker compose -f docker/docker-compose.yml start consumer -sleep 5 +http -v :8881/0/0 +http -v :8881/1/3 docker compose -f docker/docker-compose.yml stop producer consumer -docker compose -f docker/docker-compose.yml logs consumer diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 9984f6b..081cd2f 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -160,6 +160,15 @@ services: juplo.client-id: ute juplo.consumer.topic: test + nodlt: + image: juplo/nodlt:1.0-SNAPSHOT + environment: + juplo.bootstrap-server: kafka:9092 + juplo.client-id: ute + juplo.consumer.topic: test + ports: + - 8881:8881 + volumes: zookeeper-data: zookeeper-log: diff --git a/pom.xml b/pom.xml index dd96d00..21bd07f 100644 --- a/pom.xml +++ b/pom.xml @@ -12,10 +12,10 @@ de.juplo.kafka - spring-consumer - Spring Consumer - Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-SNAPSHOT + nodlt + NoDLT + A standalone consumer, that fetches messages for specific offsets + 1.0-SNAPSHOT 21 @@ -24,7 +24,7 @@ org.springframework.boot - spring-boot-starter-web + spring-boot-starter-webflux org.springframework.boot diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 5730a45..3284ca6 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -16,7 +16,7 @@ import java.util.Properties; public class ApplicationConfiguration { @Bean - public DeadLetterConsumer exampleConsumer( + public DeadLetterConsumer deadLetterConsumer( Consumer kafkaConsumer, ApplicationProperties properties, ConfigurableApplicationContext applicationContext) @@ -33,20 +33,13 @@ public class ApplicationConfiguration public KafkaConsumer kafkaConsumer(ApplicationProperties properties) { Properties props = new Properties(); + props.put("bootstrap.servers", properties.getBootstrapServer()); props.put("client.id", properties.getClientId()); props.put("group.id", properties.getConsumerProperties().getGroupId()); - if (properties.getConsumerProperties().getAutoOffsetReset() != null) - { - props.put("auto.offset.reset", properties.getConsumerProperties().getAutoOffsetReset().name()); - } - if (properties.getConsumerProperties().getAutoCommitInterval() != null) - { - props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval()); - } - props.put("metadata.max.age.ms", 5000); // 5 Sekunden props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); + props.put("auto.offset.reset", "none"); return new KafkaConsumer<>(props); } diff --git a/src/main/java/de/juplo/kafka/ApplicationController.java b/src/main/java/de/juplo/kafka/ApplicationController.java new file mode 100644 index 0000000..e526f82 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationController.java @@ -0,0 +1,24 @@ +package de.juplo.kafka; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Mono; + + +@RestController +public class ApplicationController +{ + @Autowired + DeadLetterConsumer deadLetterConsumer; + + + @GetMapping(path = "/{partition}/{offset}") + public Mono recordAtOffset( + @PathVariable int partition, + @PathVariable long offset) + { + return deadLetterConsumer.requestRecord(partition, offset); + } +} diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index c8193c9..9d2511b 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -7,8 +7,6 @@ import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.validation.annotation.Validated; -import java.time.Duration; - @ConfigurationProperties(prefix = "juplo") @Validated @@ -44,9 +42,5 @@ public class ApplicationProperties @NotNull @NotEmpty private String topic; - private OffsetReset autoOffsetReset; - private Duration autoCommitInterval; - - enum OffsetReset { latest, earliest, none } } } diff --git a/src/main/java/de/juplo/kafka/DeadLetterConsumer.java b/src/main/java/de/juplo/kafka/DeadLetterConsumer.java index 21a0e9c..54192a5 100644 --- a/src/main/java/de/juplo/kafka/DeadLetterConsumer.java +++ b/src/main/java/de/juplo/kafka/DeadLetterConsumer.java @@ -4,10 +4,19 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; +import reactor.core.publisher.Mono; import java.time.Duration; -import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.stream.Collectors; +import java.util.stream.IntStream; @Slf4j @@ -15,11 +24,14 @@ public class DeadLetterConsumer implements Runnable { private final String id; private final String topic; + private final int numPartitions; + private final Queue[] pendingFetchRequests; + private final FetchRequest[] currentFetchRequest; private final Consumer consumer; private final Thread workerThread; private final Runnable closeCallback; - private long consumed = 0; + private boolean shutdownIsRequested = false; public DeadLetterConsumer( @@ -32,7 +44,14 @@ public class DeadLetterConsumer implements Runnable this.topic = topic; this.consumer = consumer; - workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); + numPartitions = consumer.partitionsFor(topic).size(); + pendingFetchRequests = IntStream + .range(0, numPartitions) + .mapToObj(info -> new ConcurrentLinkedQueue()) + .toArray(size -> new Queue[size]); + currentFetchRequest = new FetchRequest[numPartitions]; + + workerThread = new Thread(this, clientId + "-worker-thread"); workerThread.start(); this.closeCallback = closeCallback; @@ -44,29 +63,76 @@ public class DeadLetterConsumer implements Runnable { try { - log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); + List partitions = IntStream + .range(0, pendingFetchRequests.length) + .mapToObj(i -> new TopicPartition(topic, i)) + .peek(partition -> log.info("{} - Assigning to {}", id, partition)) + .collect(Collectors.toList()); - while (true) - { - ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + consumer.assign(partitions); + consumer.pause(partitions); - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) + // Without this, the first call to poll() triggers an NoOffsetForPartitionException, if the topic is empty + partitions.forEach(partition -> consumer.seek(partition, 0)); + + while (!shutdownIsRequested) + { + try { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); + ConsumerRecords records = consumer.poll(Duration.ofMinutes(1)); + + log.info("{} - Received {} messages", id, records.count()); + List partitionsToPause = new LinkedList<>(); + for (TopicPartition partition : records.partitions()) + { + for (ConsumerRecord record : records.records(partition)) + { + log.info( + "{} - fetched partition={}-{}, offset={}: {}", + id, + topic, + record.partition(), + record.offset(), + record.key()); + + FetchRequest fetchRequest = currentFetchRequest[record.partition()]; + + fetchRequest.future().complete(record.value()); + schedulePendingFetchRequest(record.partition()).ifPresentOrElse( + (nextFetchRequest) -> + { + scheduleFetchRequest(nextFetchRequest); + }, + () -> + { + log.info("{} - no pending fetch-requests for {}", id, partition); + currentFetchRequest[record.partition()] = null; + partitionsToPause.add(fetchRequest.partition()); + }); + + break; + } + } + + consumer.pause(partitionsToPause); + } + catch(WakeupException e) + { + log.info("{} - Consumer was awakened", id); + + List partitionsToResume = new LinkedList<>(); + + for (int partition = 0; partition < numPartitions; partition++) + { + schedulePendingFetchRequest(partition) + .map(fetchRequest -> fetchRequest.partition()) + .ifPresent(topicPartition -> partitionsToResume.add(topicPartition)); + } + + consumer.resume(partitionsToResume); } } } - catch(WakeupException e) - { - log.info("{} - Consumer was signaled to finish its work", id); - } catch(Exception e) { log.error("{} - Unexpected error, unsubscribing!", id, e); @@ -78,25 +144,65 @@ public class DeadLetterConsumer implements Runnable { log.info("{} - Closing the KafkaConsumer", id); consumer.close(); - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); + log.info("{} - Exiting!", id); } } - private void handleRecord( - String topic, - Integer partition, - Long offset, - String key, - String value) + private Optional schedulePendingFetchRequest(int partition) { - consumed++; - log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); + if (currentFetchRequest[partition] == null) + { + FetchRequest nextFetchRequest = pendingFetchRequests[partition].poll(); + if (nextFetchRequest != null) + { + scheduleFetchRequest(nextFetchRequest); + return Optional.of(nextFetchRequest); + } + else + { + log.trace("{} - no pending fetch-request for partition {}.", id, partition); + } + } + else + { + log.debug("{} - fetch-request {} is still in progress.", id, currentFetchRequest[partition]); + } + + return Optional.empty(); } + private void scheduleFetchRequest(FetchRequest fetchRequest) + { + log.debug("{} - scheduling fetch-request {}.", id, fetchRequest); + + currentFetchRequest[fetchRequest.partition().partition()] = fetchRequest; + consumer.seek(fetchRequest.partition(), fetchRequest.offset()); + } + Mono requestRecord(int partition, long offset) + { + CompletableFuture future = new CompletableFuture<>(); + + FetchRequest fetchRequest = new FetchRequest( + new TopicPartition(topic, partition), + offset, + future); + + pendingFetchRequests[partition].add(fetchRequest); + + log.info( + "{} - fetch-request for partition={}, offset={}: Waking up consumer!", + id, + partition, + offset); + consumer.wakeup(); + + return Mono.fromFuture(future); + } public void shutdown() throws InterruptedException { - log.info("{} - Waking up the consumer", id); + log.info("{} - Requesting shutdown", id); + shutdownIsRequested = true; consumer.wakeup(); log.info("{} - Joining the worker thread", id); workerThread.join(); diff --git a/src/main/java/de/juplo/kafka/FetchRequest.java b/src/main/java/de/juplo/kafka/FetchRequest.java new file mode 100644 index 0000000..394e2c9 --- /dev/null +++ b/src/main/java/de/juplo/kafka/FetchRequest.java @@ -0,0 +1,18 @@ +package de.juplo.kafka; + +import org.apache.kafka.common.TopicPartition; + +import java.util.concurrent.CompletableFuture; + + +public record FetchRequest( + TopicPartition partition, + long offset, + CompletableFuture future) +{ + @Override + public String toString() + { + return partition + "@" + offset; + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 7a06731..0c4e07f 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -2,10 +2,8 @@ juplo: bootstrap-server: :9092 client-id: DEV consumer: - group-id: my-group + group-id: nodlt topic: test - auto-offset-reset: earliest - auto-commit-interval: 5s management: endpoint: shutdown: @@ -26,8 +24,6 @@ info: consumer: group-id: ${juplo.consumer.group-id} topic: ${juplo.consumer.topic} - auto-offset-reset: ${juplo.consumer.auto-offset-reset} - auto-commit-interval: ${juplo.consumer.auto-commit-interval} logging: level: root: INFO -- 2.20.1