WIP
authorKai Moritz <kai@juplo.de>
Fri, 11 Jun 2021 08:38:07 +0000 (10:38 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 11 Jun 2021 08:38:07 +0000 (10:38 +0200)
README.sh
src/main/java/de/juplo/kafka/seek/Consumer.java

index e01b9c5..f36d5b0 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -3,27 +3,40 @@
 if [ "$1" = "cleanup" ]
 then
   docker-compose down -v
+  mvn clean
+  docker image rm juplo/seek:1.0-SNAPSHOT
   exit
 fi
 
 docker-compose up -d zookeeper kafka
 
+if [[
+  $(docker image ls -q juplo/seek:1.0-SNAPSHOT) == "" ||
+  "$1" = "build"
+]]
+then
+  mvn install || exit
+else
+  echo "Using image existing images:"
+  docker image ls juplo/seek:1.0-SNAPSHOT
+fi
+
 while ! [[ $(docker-compose exec kafka zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]];
 do
   echo "Waiting for kafka...";
   sleep 1;
 done
 
-docker-compose exec kafka kafka-topics --zookeeper zookeeper:2181 --create --if-not-exists --replication-factor 1 --partitions 1 --topic foo
-
-docker-compose up -d producer consumer
+docker-compose exec kafka kafka-topics --zookeeper zookeeper:2181 --create --if-not-exists --replication-factor 1 --partitions 7 --topic test
 
+docker-compose up -d producer
+docker-compose up -d perter franz beate ute klaus paul sigi
 sleep 3
-docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group bar --reset-offsets --to-earliest 
+docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group seek --reset-offsets --to-earliest 
 sleep 3
-docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group bar --reset-offsets --to-earliest 
+docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group seek --reset-offsets --to-earliest 
 sleep 3
-docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group bar --reset-offsets --to-earliest 
+docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group seek --reset-offsets --to-earliest 
 
-docker-compose stop producer consumer
+docker-compose stop producer franz beate ute klaus paul sigi
 docker-compose logs consumer
index 1fb4cdd..7376945 100644 (file)
@@ -26,6 +26,7 @@ public class Consumer implements Runnable
   private final KafkaConsumer<Long, String> consumer;
 
   private boolean running = false;
+  Long offset = null;
   Future<?> future = null;
 
 
@@ -45,6 +46,7 @@ public class Consumer implements Runnable
     props.put("bootstrap.servers", bootstrapServer);
     props.put("group.id", groupId);
     props.put("client.id", clientId);
+    props.put("commit.interval.ms", 500);
     props.put("key.deserializer", StringDeserializer.class.getName());
     props.put("value.deserializer", StringDeserializer.class.getName());
 
@@ -65,6 +67,18 @@ public class Consumer implements Runnable
 
       while (running)
       {
+        if (offset != null)
+        {
+          log.info("{} - seeking to offset {}", id, offset);
+          consumer
+              .partitionsFor(topic)
+              .forEach(partition ->
+                  consumer.seek(
+                      new TopicPartition(topic, partition.partition()),
+                      offset));
+          offset = null;
+        }
+
         ConsumerRecords<Long, String> records = consumer.poll(Duration.ofSeconds(1));
         for (ConsumerRecord<Long, String> record : records)
           log.info(
@@ -90,18 +104,14 @@ public class Consumer implements Runnable
       log.info("{} - Unsubscribing...", id);
       consumer.unsubscribe();
       running = false;
+      offset = null;
     }
   }
 
 
   public void seek(long offset)
   {
-    consumer
-        .partitionsFor(topic)
-        .forEach(partition ->
-            consumer.seek(
-                new TopicPartition(topic, partition.partition()),
-                offset));
+    this.offset = offset;
   }