Merge branch 'stored-state' into stored-offsets
[demos/kafka/training] / src / main / java / de / juplo / kafka / StatisticsDocument.java
index e8c2e9b..96ebfb1 100644 (file)
@@ -1,7 +1,6 @@
 package de.juplo.kafka;
 
 import lombok.ToString;
-import org.apache.kafka.common.TopicPartition;
 import org.springframework.data.annotation.Id;
 import org.springframework.data.mongodb.core.mapping.Document;
 
@@ -15,35 +14,23 @@ public class StatisticsDocument
 {
   @Id
   public String id;
-  public String topic;
-  public Integer partition;
   public long offset;
-  public Map<String, Long> statistics;
+  public Map<String, Integer> statistics;
 
   public StatisticsDocument()
   {
   }
 
-  public StatisticsDocument(TopicPartition tp)
+  public StatisticsDocument(Integer partition)
   {
-    this.topic = tp.topic();
-    this.partition = tp.partition();
-    this.offset = 0;
+    this.id = Integer.toString(partition);
+    this.statistics = new HashMap<>();
   }
 
-  public StatisticsDocument(String topic, Integer partition, Map<String, Long> statistics)
+  public StatisticsDocument(Integer partition, Map<String, Integer> statistics, long offset)
   {
-    this.partition = partition;
+    this.id = Integer.toString(partition);
     this.statistics = statistics;
-  }
-
-  public StatisticsDocument(PartitionStatistics statistics, long offset)
-  {
-    this.topic = statistics.getPartition().topic();
-    this.id = statistics.toString();
-    this.partition = statistics.getPartition().partition();
     this.offset = offset;
-    this.statistics = new HashMap<>();
-    statistics.getStatistics().forEach(counter -> this.statistics.put(counter.getKey(), counter.getResult()));
   }
 }