+package de.juplo.kafka;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+
+@RequiredArgsConstructor
+@Getter
+@EqualsAndHashCode(of = { "partition" })
+public class PartitionStatistics
+{
+ private final TopicPartition partition;
+ private final Map<String, KeyCounter> statistics = new HashMap<>();
+
+
+ public KeyCounter addKey(String key)
+ {
+ KeyCounter counter = new KeyCounter(key);
+
+ counter.increment();
+ statistics.put(key, counter);
+
+ return counter;
+ }
+
+
+ public long increment(String key)
+ {
+ KeyCounter counter = statistics.get(key);
+
+ if (counter == null)
+ {
+ counter = new KeyCounter(key);
+ statistics.put(key, counter);
+ }
+
+ return counter.increment();
+ }
+
+ public Collection<KeyCounter> getStatistics()
+ {
+ return statistics.values();
+ }
+
+ @Override
+ public String toString()
+ {
+ return partition.toString();
+ }
+}