WIP
[demos/kafka/seek] / src / main / java / de / juplo / kafka / seek / Consumer.java
index 1fb4cdd..7376945 100644 (file)
@@ -26,6 +26,7 @@ public class Consumer implements Runnable
   private final KafkaConsumer<Long, String> consumer;
 
   private boolean running = false;
+  Long offset = null;
   Future<?> future = null;
 
 
@@ -45,6 +46,7 @@ public class Consumer implements Runnable
     props.put("bootstrap.servers", bootstrapServer);
     props.put("group.id", groupId);
     props.put("client.id", clientId);
+    props.put("commit.interval.ms", 500);
     props.put("key.deserializer", StringDeserializer.class.getName());
     props.put("value.deserializer", StringDeserializer.class.getName());
 
@@ -65,6 +67,18 @@ public class Consumer implements Runnable
 
       while (running)
       {
+        if (offset != null)
+        {
+          log.info("{} - seeking to offset {}", id, offset);
+          consumer
+              .partitionsFor(topic)
+              .forEach(partition ->
+                  consumer.seek(
+                      new TopicPartition(topic, partition.partition()),
+                      offset));
+          offset = null;
+        }
+
         ConsumerRecords<Long, String> records = consumer.poll(Duration.ofSeconds(1));
         for (ConsumerRecord<Long, String> record : records)
           log.info(
@@ -90,18 +104,14 @@ public class Consumer implements Runnable
       log.info("{} - Unsubscribing...", id);
       consumer.unsubscribe();
       running = false;
+      offset = null;
     }
   }
 
 
   public void seek(long offset)
   {
-    consumer
-        .partitionsFor(topic)
-        .forEach(partition ->
-            consumer.seek(
-                new TopicPartition(topic, partition.partition()),
-                offset));
+    this.offset = offset;
   }