Offset-Position wird in der MongoDB gespeichert
[demos/kafka/training] / src / main / java / de / juplo / kafka / StatisticsDocument.java
index 9318c4c..e8c2e9b 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka;
 
 import lombok.ToString;
+import org.apache.kafka.common.TopicPartition;
 import org.springframework.data.annotation.Id;
 import org.springframework.data.mongodb.core.mapping.Document;
 
@@ -16,23 +17,32 @@ public class StatisticsDocument
   public String id;
   public String topic;
   public Integer partition;
+  public long offset;
   public Map<String, Long> statistics;
 
   public StatisticsDocument()
   {
   }
 
+  public StatisticsDocument(TopicPartition tp)
+  {
+    this.topic = tp.topic();
+    this.partition = tp.partition();
+    this.offset = 0;
+  }
+
   public StatisticsDocument(String topic, Integer partition, Map<String, Long> statistics)
   {
     this.partition = partition;
     this.statistics = statistics;
   }
 
-  public StatisticsDocument(PartitionStatistics statistics)
+  public StatisticsDocument(PartitionStatistics statistics, long offset)
   {
     this.topic = statistics.getPartition().topic();
     this.id = statistics.toString();
     this.partition = statistics.getPartition().partition();
+    this.offset = offset;
     this.statistics = new HashMap<>();
     statistics.getStatistics().forEach(counter -> this.statistics.put(counter.getKey(), counter.getResult()));
   }