Offset-Position in der MongoDB wird vor jedem `poll()` aktualisiert
[demos/kafka/training] / src / main / java / de / juplo / kafka / StatisticsDocument.java
1 package de.juplo.kafka;
2
3 import lombok.ToString;
4 import org.apache.kafka.common.TopicPartition;
5 import org.springframework.data.annotation.Id;
6 import org.springframework.data.mongodb.core.mapping.Document;
7
8 import java.util.HashMap;
9 import java.util.Map;
10
11
12 @Document(collection = "statistics")
13 @ToString
14 public class StatisticsDocument
15 {
16   @Id
17   public String id;
18   public String topic;
19   public Integer partition;
20   public long offset;
21   public Map<String, Long> statistics;
22
23   public StatisticsDocument()
24   {
25   }
26
27   public StatisticsDocument(TopicPartition tp)
28   {
29     this.topic = tp.topic();
30     this.partition = tp.partition();
31     this.offset = 0;
32   }
33
34   public StatisticsDocument(String topic, Integer partition, Map<String, Long> statistics)
35   {
36     this.partition = partition;
37     this.statistics = statistics;
38   }
39
40   public StatisticsDocument(PartitionStatistics statistics, long offset)
41   {
42     this.topic = statistics.getPartition().topic();
43     this.id = statistics.toString();
44     this.partition = statistics.getPartition().partition();
45     this.offset = offset;
46     this.statistics = new HashMap<>();
47     statistics.getStatistics().forEach(counter -> this.statistics.put(counter.getKey(), counter.getResult()));
48   }
49 }