Offset-Position wird in der MongoDB gespeichert
authorKai Moritz <kai@juplo.de>
Wed, 6 Apr 2022 21:35:27 +0000 (23:35 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 6 Apr 2022 21:35:27 +0000 (23:35 +0200)
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/StatisticsDocument.java

index e67bf41..69c8294 100644 (file)
@@ -64,6 +64,7 @@ public class EndlessConsumer implements Runnable
       props.put("bootstrap.servers", bootstrapServer);
       props.put("group.id", groupId);
       props.put("client.id", id);
+      props.put("enable.auto.commit", false);
       props.put("auto.offset.reset", autoOffsetReset);
       props.put("metadata.max.age.ms", "1000");
       props.put("key.deserializer", StringDeserializer.class.getName());
@@ -90,7 +91,7 @@ public class EndlessConsumer implements Runnable
                   removed.getPartition(),
                   counter.getKey());
             }
-            repository.save(new StatisticsDocument(removed));
+            repository.save(new StatisticsDocument(removed, consumer.position(tp)));
           });
         }
 
@@ -100,12 +101,12 @@ public class EndlessConsumer implements Runnable
           partitions.forEach(tp ->
           {
             log.info("{} - adding partition: {}", id, tp);
-            seen.put(
-                tp,
+            StatisticsDocument document =
                 repository
                     .findById(tp.toString())
-                    .map(PartitionStatistics::new)
-                    .orElse(new PartitionStatistics(tp)));
+                    .orElse(new StatisticsDocument(tp));
+            consumer.seek(tp, document.offset);
+            seen.put(tp, new PartitionStatistics(document));
           });
         }
       });
index 9318c4c..e8c2e9b 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka;
 
 import lombok.ToString;
+import org.apache.kafka.common.TopicPartition;
 import org.springframework.data.annotation.Id;
 import org.springframework.data.mongodb.core.mapping.Document;
 
@@ -16,23 +17,32 @@ public class StatisticsDocument
   public String id;
   public String topic;
   public Integer partition;
+  public long offset;
   public Map<String, Long> statistics;
 
   public StatisticsDocument()
   {
   }
 
+  public StatisticsDocument(TopicPartition tp)
+  {
+    this.topic = tp.topic();
+    this.partition = tp.partition();
+    this.offset = 0;
+  }
+
   public StatisticsDocument(String topic, Integer partition, Map<String, Long> statistics)
   {
     this.partition = partition;
     this.statistics = statistics;
   }
 
-  public StatisticsDocument(PartitionStatistics statistics)
+  public StatisticsDocument(PartitionStatistics statistics, long offset)
   {
     this.topic = statistics.getPartition().topic();
     this.id = statistics.toString();
     this.partition = statistics.getPartition().partition();
+    this.offset = offset;
     this.statistics = new HashMap<>();
     statistics.getStatistics().forEach(counter -> this.statistics.put(counter.getKey(), counter.getResult()));
   }