WIP
[demos/kafka/seek] / src / main / java / de / juplo / kafka / seek / Consumer.java
index 0c232d6..77e156b 100644 (file)
@@ -4,8 +4,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 
 import javax.annotation.PreDestroy;
@@ -26,6 +26,7 @@ public class Consumer implements Runnable
   private final KafkaConsumer<Long, String> consumer;
 
   private boolean running = false;
+  Long offset = null;
   Future<?> future = null;
 
 
@@ -45,7 +46,8 @@ public class Consumer implements Runnable
     props.put("bootstrap.servers", bootstrapServer);
     props.put("group.id", groupId);
     props.put("client.id", clientId);
-    props.put("key.deserializer", LongDeserializer.class.getName());
+    props.put("commit.interval.ms", 500);
+    props.put("key.deserializer", StringDeserializer.class.getName());
     props.put("value.deserializer", StringDeserializer.class.getName());
 
     consumer = new KafkaConsumer<>(props);
@@ -55,16 +57,25 @@ public class Consumer implements Runnable
   @Override
   public void run()
   {
-    log.info("{} - Subscribing to topic test", id);
-    consumer.subscribe(Arrays.asList(topic));
-
     try
     {
-
-      running = true;
+      log.info("{} - Subscribing to topic test", id);
+      consumer.subscribe(Arrays.asList(topic));
 
       while (running)
       {
+        if (offset != null)
+        {
+          log.info("{} - seeking to offset {}", id, offset);
+          consumer
+              .partitionsFor(topic)
+              .forEach(partition ->
+                  consumer.seek(
+                      new TopicPartition(topic, partition.partition()),
+                      offset));
+          offset = null;
+        }
+
         ConsumerRecords<Long, String> records = consumer.poll(Duration.ofSeconds(1));
         for (ConsumerRecord<Long, String> record : records)
           log.info(
@@ -81,27 +92,40 @@ public class Consumer implements Runnable
     {
       log.info("{} - RIIING!", id);
     }
+    catch(Exception e)
+    {
+      log.error("{} - Unexpected error: {}", id, e.toString());
+    }
     finally
     {
       log.info("{} - Unsubscribing...", id);
-      consumer.unsubscribe();
       running = false;
+      consumer.unsubscribe();
+      offset = null;
     }
   }
 
+
+  public void seek(long offset)
+  {
+    this.offset = offset;
+  }
+
+
   public synchronized void start()
   {
     if (running)
       throw new RuntimeException("Consumier instance " + id + " is already running!");
 
     log.info("Running {}", id);
+    running = true;
     future = executor.submit(this);
   }
 
   public synchronized void stop() throws ExecutionException, InterruptedException
   {
     if (!running)
-      throw new RuntimeException("Consumier instance " + id + " is not running!");
+      throw new RuntimeException("Consumer instance " + id + " is not running!");
 
     log.info("Stopping {}", id);
     running = false;