Die Key-Statistiken werden in einer MongoDB gespeichert
[demos/kafka/training] / src / main / java / de / juplo / kafka / PartitionStatistics.java
1 package de.juplo.kafka;
2
3 import lombok.EqualsAndHashCode;
4 import lombok.Getter;
5 import org.apache.kafka.common.TopicPartition;
6
7 import java.util.Collection;
8 import java.util.HashMap;
9 import java.util.Map;
10
11
12 @Getter
13 @EqualsAndHashCode(of = { "partition" })
14 public class PartitionStatistics
15 {
16   private String id;
17   private final TopicPartition partition;
18   private final Map<String, KeyCounter> statistics = new HashMap<>();
19
20
21   public PartitionStatistics(TopicPartition partition)
22   {
23     this.partition = partition;
24   }
25
26   public PartitionStatistics(StatisticsDocument document)
27   {
28     this.partition = new TopicPartition(document.topic, document.partition);
29     document
30         .statistics
31         .entrySet()
32         .forEach(entry ->
33         {
34           this.statistics.put(
35               entry.getKey(),
36               new KeyCounter(entry.getKey(), entry.getValue()));
37         });
38   }
39
40
41   public KeyCounter addKey(String key)
42   {
43     KeyCounter counter = new KeyCounter(key);
44
45     counter.increment();
46     statistics.put(key, counter);
47
48     return counter;
49   }
50
51
52   public long increment(String key)
53   {
54     KeyCounter counter = statistics.get(key);
55
56     if (counter == null)
57     {
58       counter = new KeyCounter(key);
59       statistics.put(key, counter);
60     }
61
62     return counter.increment();
63   }
64
65   public Collection<KeyCounter> getStatistics()
66   {
67     return statistics.values();
68   }
69
70   @Override
71   public String toString()
72   {
73     return partition.toString();
74   }
75 }