From 59018fa4f215c1b0dd877d814313f34c3fb8d719 Mon Sep 17 00:00:00 2001
From: Kai Moritz <kai@juplo.de>
Date: Sun, 10 Nov 2024 12:10:10 +0100
Subject: [PATCH] Version des ``spring-consumer``, die zu vorgegebenen Offsets
 springt
MIME-Version: 1.0
Content-Type: text/plain; charset=utf8
Content-Transfer-Encoding: 8bit

* Die übergebenen Offsets werden bei der Zuteilung der Partitionen
  verarbeitet.
* Anschließend werden die übergebenen Start-Offsets gelöscht, so dass bei
  späteren Rebalances kein erneuter Wechsel der Offset-Position erfolgt.
* Die Anwendung überprüft nicht, ob die Menge der übergebenen Offsets
  vollständig ist, oder zu den zugeteilten Partitionen passt.
* D.h. insbesondere, dass ein Wechsel der Offset-Positionen für eine
  bestimmte Partition ggf. erst später erfolgt, wenn diese Partition einer
  anderen Instanz zugeteilt ist und diese Instanz die Partition erst
  später frei gibt.
* Docker-Setup und `README.sh` zur Vorführung angepasst
---
 README.sh                                     | 15 +++++----
 docker/docker-compose.yml                     | 14 ++++++--
 pom.xml                                       |  2 +-
 .../juplo/kafka/ApplicationConfiguration.java | 22 +++++++++++++
 .../de/juplo/kafka/ApplicationProperties.java |  1 +
 .../java/de/juplo/kafka/ExampleConsumer.java  | 32 +++++++++++++++++--
 6 files changed, 72 insertions(+), 14 deletions(-)

diff --git a/README.sh b/README.sh
index b46e2350..3ec63094 100755
--- a/README.sh
+++ b/README.sh
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-IMAGE=juplo/spring-consumer:1.1-SNAPSHOT
+IMAGE=juplo/spring-consumer:1.1-seek-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
@@ -10,7 +10,7 @@ then
 fi
 
 docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3
-docker compose -f docker/docker-compose.yml rm -svf consumer
+docker compose -f docker/docker-compose.yml rm -svf consumer seek
 
 if [[
   $(docker image ls -q $IMAGE) == "" ||
@@ -29,11 +29,12 @@ docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1
 docker compose -f docker/docker-compose.yml up -d producer
 docker compose -f docker/docker-compose.yml up -d consumer
 
+sleep 5
+docker compose -f docker/docker-compose.yml stop producer
+docker compose -f docker/docker-compose.yml up -d seek
 sleep 5
 docker compose -f docker/docker-compose.yml stop consumer
-
-docker compose -f docker/docker-compose.yml start consumer
 sleep 5
-
-docker compose -f docker/docker-compose.yml stop producer consumer
-docker compose -f docker/docker-compose.yml logs consumer
+docker compose -f docker/docker-compose.yml restart seek
+sleep 10
+docker compose -f docker/docker-compose.yml logs seek
diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml
index 4fa2eade..7c9ebb30 100644
--- a/docker/docker-compose.yml
+++ b/docker/docker-compose.yml
@@ -145,26 +145,34 @@ services:
       juplo.producer.throttle-ms: 100
 
   consumer:
-    image: juplo/spring-consumer:1.1-SNAPSHOT
+    image: juplo/spring-consumer:1.1-seek-SNAPSHOT
     environment:
       juplo.bootstrap-server: kafka:9092
       juplo.client-id: consumer
       juplo.consumer.topic: test
 
   peter:
-    image: juplo/spring-consumer:1.1-SNAPSHOT
+    image: juplo/spring-consumer:1.1-seek-SNAPSHOT
     environment:
       juplo.bootstrap-server: kafka:9092
       juplo.client-id: peter
       juplo.consumer.topic: test
 
   ute:
-    image: juplo/spring-consumer:1.1-SNAPSHOT
+    image: juplo/spring-consumer:1.1-seek-SNAPSHOT
     environment:
       juplo.bootstrap-server: kafka:9092
       juplo.client-id: ute
       juplo.consumer.topic: test
 
+  seek:
+    image: juplo/spring-consumer:1.1-seek-SNAPSHOT
+    environment:
+      juplo.bootstrap-server: kafka:9092
+      juplo.client-id: seek
+      juplo.consumer.topic: test
+      juplo.consumer.offsets: 0=5,1=7
+
 volumes:
   zookeeper-data:
   zookeeper-log:
diff --git a/pom.xml b/pom.xml
index dd96d00f..40398c59 100644
--- a/pom.xml
+++ b/pom.xml
@@ -15,7 +15,7 @@
   <artifactId>spring-consumer</artifactId>
   <name>Spring Consumer</name>
   <description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
-  <version>1.1-SNAPSHOT</version>
+  <version>1.1-seek-SNAPSHOT</version>
 
   <properties>
     <java.version>21</java.version>
diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java
index d2b8e05c..5b3c51b2 100644
--- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java
+++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java
@@ -3,13 +3,18 @@ package de.juplo.kafka;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.StickyAssignor;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
+import java.util.stream.Collectors;
 
 
 @Configuration
@@ -22,10 +27,27 @@ public class ApplicationConfiguration
     ApplicationProperties properties,
     ConfigurableApplicationContext applicationContext)
   {
+    Map<TopicPartition, Long> offsets;
+
+    if (properties.getConsumerProperties().getOffsets() == null)
+    {
+      offsets = new HashMap<>();
+    }
+    else
+    {
+      offsets = Arrays
+        .stream(properties.getConsumerProperties().getOffsets())
+        .map(partition -> partition.split("="))
+        .collect(Collectors.toMap(
+          parts -> new TopicPartition(properties.getConsumer().getTopic(), Integer.parseInt(parts[0])),
+          parts -> Long.parseLong(parts[1])));
+    }
+
     return
       new ExampleConsumer<>(
         properties.getClientId(),
         properties.getConsumerProperties().getTopic(),
+        offsets,
         kafkaConsumer,
         () -> applicationContext.close());
   }
diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java
index c8193c9f..ae97d75d 100644
--- a/src/main/java/de/juplo/kafka/ApplicationProperties.java
+++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java
@@ -44,6 +44,7 @@ public class ApplicationProperties
     @NotNull
     @NotEmpty
     private String topic;
+    private String[] offsets;
     private OffsetReset autoOffsetReset;
     private Duration autoCommitInterval;
 
diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java
index a6691c3b..21f98142 100644
--- a/src/main/java/de/juplo/kafka/ExampleConsumer.java
+++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java
@@ -2,19 +2,22 @@ package de.juplo.kafka;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 
 import java.time.Duration;
-import java.util.Arrays;
+import java.util.*;
 
 
 @Slf4j
-public class ExampleConsumer<K, V> implements Runnable
+public class ExampleConsumer<K, V> implements Runnable, ConsumerRebalanceListener
 {
   private final String id;
   private final String topic;
+  private final Map<TopicPartition, Long> offsets;
   private final Consumer<K, V> consumer;
   private final Thread workerThread;
   private final Runnable closeCallback;
@@ -26,11 +29,13 @@ public class ExampleConsumer<K, V> implements Runnable
   public ExampleConsumer(
     String clientId,
     String topic,
+    Map<TopicPartition, Long> offsets,
     Consumer<K, V> consumer,
     Runnable closeCallback)
   {
     this.id = clientId;
     this.topic = topic;
+    this.offsets = offsets;
     this.consumer = consumer;
 
     workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
@@ -46,7 +51,7 @@ public class ExampleConsumer<K, V> implements Runnable
     try
     {
       log.info("{} - Subscribing to topic {}", id, topic);
-      consumer.subscribe(Arrays.asList(topic));
+      consumer.subscribe(Arrays.asList(topic), this);
       running = true;
 
       while (running)
@@ -96,6 +101,27 @@ public class ExampleConsumer<K, V> implements Runnable
   }
 
 
+  @Override
+  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+  {
+    partitions.forEach(partition ->
+    {
+      Long offset = offsets.get(partition);
+      if (offset != null)
+      {
+        log.info("{} - Seeking to offset {} for partition {}", id, offset, partition);
+        consumer.seek(partition, offset);
+        offsets.remove(partition);
+      }
+    });
+  }
+
+  @Override
+  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+  {
+  }
+
+
   public void shutdown() throws InterruptedException
   {
     log.info("{} joining the worker-thread...", id);
-- 
2.20.1