Merge branch 'rebalance-listener' into stored-state
authorKai Moritz <kai@juplo.de>
Thu, 7 Apr 2022 23:13:01 +0000 (01:13 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 7 Apr 2022 23:13:01 +0000 (01:13 +0200)
1  2 
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/StatisticsDocument.java

@@@ -86,11 -83,10 +86,11 @@@ public class EndlessConsumer implement
                log.info(
                    "{} - Seen {} messages for partition={}|key={}",
                    id,
-                   counter.getResult(),
-                   removed.getPartition(),
-                   counter.getKey());
+                   removed.get(key),
+                   tp.partition(),
+                   key);
              }
-             repository.save(new StatisticsDocument(removed));
++            repository.save(new StatisticsDocument(tp.partition(), removed));
            });
          }
  
            partitions.forEach(tp ->
            {
              log.info("{} - adding partition: {}", id, tp);
 -            seen.put(tp.partition(), new HashMap<>());
 +            seen.put(
-                 tp,
++                tp.partition(),
 +                repository
-                     .findById(tp.toString())
-                     .map(PartitionStatistics::new)
-                     .orElse(new PartitionStatistics(tp)));
++                    .findById(Integer.toString(tp.partition()))
++                    .map(document -> document.statistics)
++                    .orElse(new HashMap<>()));
            });
          }
        });
index 9318c4c,0000000..be998ca
mode 100644,000000..100644
--- /dev/null
@@@ -1,39 -1,0 +1,28 @@@
-   public String topic;
-   public Integer partition;
-   public Map<String, Long> statistics;
 +package de.juplo.kafka;
 +
 +import lombok.ToString;
 +import org.springframework.data.annotation.Id;
 +import org.springframework.data.mongodb.core.mapping.Document;
 +
 +import java.util.HashMap;
 +import java.util.Map;
 +
 +
 +@Document(collection = "statistics")
 +@ToString
 +public class StatisticsDocument
 +{
 +  @Id
 +  public String id;
-   public StatisticsDocument(String topic, Integer partition, Map<String, Long> statistics)
++  public Map<String, Integer> statistics;
 +
 +  public StatisticsDocument()
 +  {
 +  }
 +
-     this.partition = partition;
++  public StatisticsDocument(Integer partition, Map<String, Integer> statistics)
 +  {
-   public StatisticsDocument(PartitionStatistics statistics)
-   {
-     this.topic = statistics.getPartition().topic();
-     this.id = statistics.toString();
-     this.partition = statistics.getPartition().partition();
-     this.statistics = new HashMap<>();
-     statistics.getStatistics().forEach(counter -> this.statistics.put(counter.getKey(), counter.getResult()));
-   }
++    this.id = Integer.toString(partition);
 +    this.statistics = statistics;
 +  }
 +}