noDLT implementiert: Service, über der einzelne Nachrichten wiederherstellt
authorKai Moritz <kai@juplo.de>
Sat, 5 Apr 2025 11:09:01 +0000 (13:09 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 21 May 2025 18:14:10 +0000 (20:14 +0200)
README.sh
docker/docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationController.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/DeadLetterConsumer.java
src/main/java/de/juplo/kafka/FetchRequest.java [new file with mode: 0644]
src/main/resources/application.yml

index b46e235..98dcff5 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-IMAGE=juplo/spring-consumer:1.1-SNAPSHOT
+IMAGE=juplo/nodlt:1.0-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
@@ -10,7 +10,7 @@ then
 fi
 
 docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3
-docker compose -f docker/docker-compose.yml rm -svf consumer
+docker compose -f docker/docker-compose.yml rm -svf nodlt
 
 if [[
   $(docker image ls -q $IMAGE) == "" ||
@@ -25,15 +25,11 @@ fi
 
 docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1
 
+docker compose -f docker/docker-compose.yml up -d producer consumer nodlt
 
-docker compose -f docker/docker-compose.yml up -d producer
-docker compose -f docker/docker-compose.yml up -d consumer
+while ! [[ $(http 0:8881/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for nodlt..."; sleep 1; done
 
-sleep 5
-docker compose -f docker/docker-compose.yml stop consumer
-
-docker compose -f docker/docker-compose.yml start consumer
-sleep 5
+http -v :8881/0/0
+http -v :8881/1/3
 
 docker compose -f docker/docker-compose.yml stop producer consumer
-docker compose -f docker/docker-compose.yml logs consumer
index 9984f6b..081cd2f 100644 (file)
@@ -160,6 +160,15 @@ services:
       juplo.client-id: ute
       juplo.consumer.topic: test
 
+  nodlt:
+    image: juplo/nodlt:1.0-SNAPSHOT
+    environment:
+      juplo.bootstrap-server: kafka:9092
+      juplo.client-id: ute
+      juplo.consumer.topic: test
+    ports:
+      - 8881:8881
+
 volumes:
   zookeeper-data:
   zookeeper-log:
diff --git a/pom.xml b/pom.xml
index dd96d00..21bd07f 100644 (file)
--- a/pom.xml
+++ b/pom.xml
   </parent>
 
   <groupId>de.juplo.kafka</groupId>
-  <artifactId>spring-consumer</artifactId>
-  <name>Spring Consumer</name>
-  <description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
-  <version>1.1-SNAPSHOT</version>
+  <artifactId>nodlt</artifactId>
+  <name>NoDLT</name>
+  <description>A standalone consumer, that fetches messages for specific offsets</description>
+  <version>1.0-SNAPSHOT</version>
 
   <properties>
     <java.version>21</java.version>
@@ -24,7 +24,7 @@
   <dependencies>
     <dependency>
       <groupId>org.springframework.boot</groupId>
-      <artifactId>spring-boot-starter-web</artifactId>
+      <artifactId>spring-boot-starter-webflux</artifactId>
     </dependency>
     <dependency>
       <groupId>org.springframework.boot</groupId>
index 5730a45..3284ca6 100644 (file)
@@ -16,7 +16,7 @@ import java.util.Properties;
 public class ApplicationConfiguration
 {
   @Bean
-  public DeadLetterConsumer exampleConsumer(
+  public DeadLetterConsumer deadLetterConsumer(
     Consumer<String, String> kafkaConsumer,
     ApplicationProperties properties,
     ConfigurableApplicationContext applicationContext)
@@ -33,20 +33,13 @@ public class ApplicationConfiguration
   public KafkaConsumer<String, String> kafkaConsumer(ApplicationProperties properties)
   {
     Properties props = new Properties();
+
     props.put("bootstrap.servers", properties.getBootstrapServer());
     props.put("client.id", properties.getClientId());
     props.put("group.id", properties.getConsumerProperties().getGroupId());
-    if (properties.getConsumerProperties().getAutoOffsetReset() != null)
-    {
-      props.put("auto.offset.reset", properties.getConsumerProperties().getAutoOffsetReset().name());
-    }
-    if (properties.getConsumerProperties().getAutoCommitInterval() != null)
-    {
-      props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval());
-    }
-    props.put("metadata.max.age.ms", 5000); //  5 Sekunden
     props.put("key.deserializer", StringDeserializer.class.getName());
     props.put("value.deserializer", StringDeserializer.class.getName());
+    props.put("auto.offset.reset", "none");
 
     return new KafkaConsumer<>(props);
   }
diff --git a/src/main/java/de/juplo/kafka/ApplicationController.java b/src/main/java/de/juplo/kafka/ApplicationController.java
new file mode 100644 (file)
index 0000000..e526f82
--- /dev/null
@@ -0,0 +1,24 @@
+package de.juplo.kafka;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
+
+
+@RestController
+public class ApplicationController
+{
+  @Autowired
+  DeadLetterConsumer deadLetterConsumer;
+
+
+  @GetMapping(path = "/{partition}/{offset}")
+  public Mono<String> recordAtOffset(
+    @PathVariable int partition,
+    @PathVariable long offset)
+  {
+    return deadLetterConsumer.requestRecord(partition, offset);
+  }
+}
index c8193c9..9d2511b 100644 (file)
@@ -7,8 +7,6 @@ import lombok.Setter;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.validation.annotation.Validated;
 
-import java.time.Duration;
-
 
 @ConfigurationProperties(prefix = "juplo")
 @Validated
@@ -44,9 +42,5 @@ public class ApplicationProperties
     @NotNull
     @NotEmpty
     private String topic;
-    private OffsetReset autoOffsetReset;
-    private Duration autoCommitInterval;
-
-    enum OffsetReset { latest, earliest, none }
   }
 }
index 21a0e9c..54192a5 100644 (file)
@@ -4,10 +4,19 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
+import reactor.core.publisher.Mono;
 
 import java.time.Duration;
-import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 
 @Slf4j
@@ -15,11 +24,14 @@ public class DeadLetterConsumer implements Runnable
 {
   private final String id;
   private final String topic;
+  private final int numPartitions;
+  private final Queue<FetchRequest>[] pendingFetchRequests;
+  private final FetchRequest[] currentFetchRequest;
   private final Consumer<String, String> consumer;
   private final Thread workerThread;
   private final Runnable closeCallback;
 
-  private long consumed = 0;
+  private boolean shutdownIsRequested = false;
 
 
   public DeadLetterConsumer(
@@ -32,7 +44,14 @@ public class DeadLetterConsumer implements Runnable
     this.topic = topic;
     this.consumer = consumer;
 
-    workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
+    numPartitions = consumer.partitionsFor(topic).size();
+    pendingFetchRequests = IntStream
+      .range(0, numPartitions)
+      .mapToObj(info -> new ConcurrentLinkedQueue<String>())
+      .toArray(size -> new Queue[size]);
+    currentFetchRequest = new FetchRequest[numPartitions];
+
+    workerThread = new Thread(this, clientId + "-worker-thread");
     workerThread.start();
 
     this.closeCallback = closeCallback;
@@ -44,29 +63,76 @@ public class DeadLetterConsumer implements Runnable
   {
     try
     {
-      log.info("{} - Subscribing to topic {}", id, topic);
-      consumer.subscribe(Arrays.asList(topic));
+      List<TopicPartition> partitions = IntStream
+        .range(0, pendingFetchRequests.length)
+        .mapToObj(i -> new TopicPartition(topic, i))
+        .peek(partition -> log.info("{} - Assigning to {}", id, partition))
+        .collect(Collectors.toList());
 
-      while (true)
-      {
-        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
+      consumer.assign(partitions);
+      consumer.pause(partitions);
 
-        log.info("{} - Received {} messages", id, records.count());
-        for (ConsumerRecord<String, String> record : records)
+      // Without this, the first call to poll() triggers an NoOffsetForPartitionException, if the topic is empty
+      partitions.forEach(partition -> consumer.seek(partition, 0));
+
+      while (!shutdownIsRequested)
+      {
+        try
         {
-          handleRecord(
-            record.topic(),
-            record.partition(),
-            record.offset(),
-            record.key(),
-            record.value());
+          ConsumerRecords<String, String> records = consumer.poll(Duration.ofMinutes(1));
+
+          log.info("{} - Received {} messages", id, records.count());
+          List<TopicPartition> partitionsToPause = new LinkedList<>();
+          for (TopicPartition partition : records.partitions())
+          {
+            for (ConsumerRecord<String, String> record : records.records(partition))
+            {
+              log.info(
+                "{} - fetched partition={}-{}, offset={}: {}",
+                id,
+                topic,
+                record.partition(),
+                record.offset(),
+                record.key());
+
+              FetchRequest fetchRequest = currentFetchRequest[record.partition()];
+
+              fetchRequest.future().complete(record.value());
+              schedulePendingFetchRequest(record.partition()).ifPresentOrElse(
+                (nextFetchRequest) ->
+                {
+                  scheduleFetchRequest(nextFetchRequest);
+                },
+                () ->
+                {
+                  log.info("{} - no pending fetch-requests for {}", id, partition);
+                  currentFetchRequest[record.partition()] = null;
+                  partitionsToPause.add(fetchRequest.partition());
+                });
+
+              break;
+            }
+          }
+
+          consumer.pause(partitionsToPause);
+        }
+        catch(WakeupException e)
+        {
+          log.info("{} - Consumer was awakened", id);
+
+          List<TopicPartition> partitionsToResume = new LinkedList<>();
+
+          for (int partition = 0; partition < numPartitions; partition++)
+          {
+            schedulePendingFetchRequest(partition)
+              .map(fetchRequest -> fetchRequest.partition())
+              .ifPresent(topicPartition -> partitionsToResume.add(topicPartition));
+          }
+
+          consumer.resume(partitionsToResume);
         }
       }
     }
-    catch(WakeupException e)
-    {
-      log.info("{} - Consumer was signaled to finish its work", id);
-    }
     catch(Exception e)
     {
       log.error("{} - Unexpected error, unsubscribing!", id, e);
@@ -78,25 +144,65 @@ public class DeadLetterConsumer implements Runnable
     {
       log.info("{} - Closing the KafkaConsumer", id);
       consumer.close();
-      log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
+      log.info("{} - Exiting!", id);
     }
   }
 
-  private void handleRecord(
-    String topic,
-    Integer partition,
-    Long offset,
-    String key,
-    String value)
+  private Optional<FetchRequest> schedulePendingFetchRequest(int partition)
   {
-    consumed++;
-    log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
+    if (currentFetchRequest[partition] == null)
+    {
+      FetchRequest nextFetchRequest = pendingFetchRequests[partition].poll();
+      if (nextFetchRequest != null)
+      {
+        scheduleFetchRequest(nextFetchRequest);
+        return Optional.of(nextFetchRequest);
+      }
+      else
+      {
+        log.trace("{} - no pending fetch-request for partition {}.", id, partition);
+      }
+    }
+    else
+    {
+      log.debug("{} - fetch-request {} is still in progress.", id, currentFetchRequest[partition]);
+    }
+
+    return Optional.empty();
   }
 
+  private void scheduleFetchRequest(FetchRequest fetchRequest)
+  {
+    log.debug("{} - scheduling fetch-request {}.", id, fetchRequest);
+
+    currentFetchRequest[fetchRequest.partition().partition()] = fetchRequest;
+    consumer.seek(fetchRequest.partition(), fetchRequest.offset());
+  }
+  Mono<String> requestRecord(int partition, long offset)
+  {
+    CompletableFuture<String> future = new CompletableFuture<>();
+
+    FetchRequest fetchRequest = new FetchRequest(
+      new TopicPartition(topic, partition),
+      offset,
+      future);
+
+    pendingFetchRequests[partition].add(fetchRequest);
+
+    log.info(
+      "{} - fetch-request for partition={}, offset={}: Waking up consumer!",
+      id,
+      partition,
+      offset);
+    consumer.wakeup();
+
+    return Mono.fromFuture(future);
+  }
 
   public void shutdown() throws InterruptedException
   {
-    log.info("{} - Waking up the consumer", id);
+    log.info("{} - Requesting shutdown", id);
+    shutdownIsRequested = true;
     consumer.wakeup();
     log.info("{} - Joining the worker thread", id);
     workerThread.join();
diff --git a/src/main/java/de/juplo/kafka/FetchRequest.java b/src/main/java/de/juplo/kafka/FetchRequest.java
new file mode 100644 (file)
index 0000000..394e2c9
--- /dev/null
@@ -0,0 +1,18 @@
+package de.juplo.kafka;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.concurrent.CompletableFuture;
+
+
+public record FetchRequest(
+  TopicPartition partition,
+  long offset,
+  CompletableFuture<String> future)
+{
+  @Override
+  public String toString()
+  {
+    return partition + "@" + offset;
+  }
+}
index 7a06731..0c4e07f 100644 (file)
@@ -2,10 +2,8 @@ juplo:
   bootstrap-server: :9092
   client-id: DEV
   consumer:
-    group-id: my-group
+    group-id: nodlt
     topic: test
-    auto-offset-reset: earliest
-    auto-commit-interval: 5s
 management:
   endpoint:
     shutdown:
@@ -26,8 +24,6 @@ info:
     consumer:
       group-id: ${juplo.consumer.group-id}
       topic: ${juplo.consumer.topic}
-      auto-offset-reset: ${juplo.consumer.auto-offset-reset}
-      auto-commit-interval: ${juplo.consumer.auto-commit-interval}
 logging:
   level:
     root: INFO