Version des ``spring-consumer``, die zu vorgegebenen Offsets springt consumer/spring-consumer--seek consumer/spring-consumer--seek--2024-11-13--si
authorKai Moritz <kai@juplo.de>
Sun, 10 Nov 2024 11:10:10 +0000 (12:10 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 10 Nov 2024 13:24:16 +0000 (14:24 +0100)
* Die übergebenen Offsets werden bei der Zuteilung der Partitionen
  verarbeitet.
* Anschließend werden die übergebenen Start-Offsets gelöscht, so dass bei
  späteren Rebalances kein erneuter Wechsel der Offset-Position erfolgt.
* Die Anwendung überprüft nicht, ob die Menge der übergebenen Offsets
  vollständig ist, oder zu den zugeteilten Partitionen passt.
* D.h. insbesondere, dass ein Wechsel der Offset-Positionen für eine
  bestimmte Partition ggf. erst später erfolgt, wenn diese Partition einer
  anderen Instanz zugeteilt ist und diese Instanz die Partition erst
  später frei gibt.
* Docker-Setup und `README.sh` zur Vorführung angepasst

README.sh
docker/docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/ExampleConsumer.java

index b46e235..3ec6309 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-IMAGE=juplo/spring-consumer:1.1-SNAPSHOT
+IMAGE=juplo/spring-consumer:1.1-seek-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
@@ -10,7 +10,7 @@ then
 fi
 
 docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3
-docker compose -f docker/docker-compose.yml rm -svf consumer
+docker compose -f docker/docker-compose.yml rm -svf consumer seek
 
 if [[
   $(docker image ls -q $IMAGE) == "" ||
@@ -29,11 +29,12 @@ docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1
 docker compose -f docker/docker-compose.yml up -d producer
 docker compose -f docker/docker-compose.yml up -d consumer
 
+sleep 5
+docker compose -f docker/docker-compose.yml stop producer
+docker compose -f docker/docker-compose.yml up -d seek
 sleep 5
 docker compose -f docker/docker-compose.yml stop consumer
-
-docker compose -f docker/docker-compose.yml start consumer
 sleep 5
-
-docker compose -f docker/docker-compose.yml stop producer consumer
-docker compose -f docker/docker-compose.yml logs consumer
+docker compose -f docker/docker-compose.yml restart seek
+sleep 10
+docker compose -f docker/docker-compose.yml logs seek
index 6bd2766..84c2d50 100644 (file)
@@ -199,12 +199,20 @@ services:
       juplo.producer.throttle-ms: 100
 
   consumer:
-    image: juplo/spring-consumer:1.1-SNAPSHOT
+    image: juplo/spring-consumer:1.1-seek-SNAPSHOT
     environment:
       juplo.bootstrap-server: kafka:9092
       juplo.client-id: consumer
       juplo.consumer.topic: test
 
+  seek:
+    image: juplo/spring-consumer:1.1-seek-SNAPSHOT
+    environment:
+      juplo.bootstrap-server: kafka:9092
+      juplo.client-id: seek
+      juplo.consumer.topic: test
+      juplo.consumer.offsets: 0=5,1=7
+
 volumes:
   zookeeper-data:
   zookeeper-log:
diff --git a/pom.xml b/pom.xml
index 98a0a36..405a349 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -15,7 +15,7 @@
   <artifactId>spring-consumer</artifactId>
   <name>Spring Consumer</name>
   <description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
-  <version>1.1-SNAPSHOT</version>
+  <version>1.1-seek-SNAPSHOT</version>
 
   <properties>
     <java.version>21</java.version>
index a4856a6..f8547a9 100644 (file)
@@ -3,13 +3,18 @@ package de.juplo.kafka;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.StickyAssignor;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
+import java.util.stream.Collectors;
 
 
 @Configuration
@@ -22,10 +27,27 @@ public class ApplicationConfiguration
       ApplicationProperties properties,
       ConfigurableApplicationContext applicationContext)
   {
+    Map<TopicPartition, Long> offsets;
+
+    if (properties.getConsumerProperties().getOffsets() == null)
+    {
+      offsets = new HashMap<>();
+    }
+    else
+    {
+      offsets = Arrays
+        .stream(properties.getConsumerProperties().getOffsets())
+        .map(partition -> partition.split("="))
+        .collect(Collectors.toMap(
+          parts -> new TopicPartition(properties.getConsumer().getTopic(), Integer.parseInt(parts[0])),
+          parts -> Long.parseLong(parts[1])));
+    }
+
     return
         new ExampleConsumer(
             properties.getClientId(),
             properties.getConsumerProperties().getTopic(),
+            offsets,
             kafkaConsumer,
             () -> applicationContext.close());
   }
index c8193c9..ae97d75 100644 (file)
@@ -44,6 +44,7 @@ public class ApplicationProperties
     @NotNull
     @NotEmpty
     private String topic;
+    private String[] offsets;
     private OffsetReset autoOffsetReset;
     private Duration autoCommitInterval;
 
index f832b45..8cb3698 100644 (file)
@@ -2,19 +2,22 @@ package de.juplo.kafka;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 
 import java.time.Duration;
-import java.util.Arrays;
+import java.util.*;
 
 
 @Slf4j
-public class ExampleConsumer implements Runnable
+public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
 {
   private final String id;
   private final String topic;
+  private final Map<TopicPartition, Long> offsets;
   private final Consumer<String, String> consumer;
   private final Thread workerThread;
   private final Runnable closeCallback;
@@ -26,11 +29,13 @@ public class ExampleConsumer implements Runnable
   public ExampleConsumer(
     String clientId,
     String topic,
+    Map<TopicPartition, Long> offsets,
     Consumer<String, String> consumer,
     Runnable closeCallback)
   {
     this.id = clientId;
     this.topic = topic;
+    this.offsets = offsets;
     this.consumer = consumer;
 
     workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
@@ -46,7 +51,7 @@ public class ExampleConsumer implements Runnable
     try
     {
       log.info("{} - Subscribing to topic {}", id, topic);
-      consumer.subscribe(Arrays.asList(topic));
+      consumer.subscribe(Arrays.asList(topic), this);
       running = true;
 
       while (running)
@@ -97,6 +102,27 @@ public class ExampleConsumer implements Runnable
   }
 
 
+  @Override
+  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+  {
+    partitions.forEach(partition ->
+    {
+      Long offset = offsets.get(partition);
+      if (offset != null)
+      {
+        log.info("{} - Seeking to offset {} for partition {}", id, offset, partition);
+        consumer.seek(partition, offset);
+        offsets.remove(partition);
+      }
+    });
+  }
+
+  @Override
+  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+  {
+  }
+
+
   public void shutdown() throws InterruptedException
   {
     log.info("{} joining the worker-thread...", id);