From: Kai Moritz Date: Thu, 7 Apr 2022 23:13:01 +0000 (+0200) Subject: Merge branch 'rebalance-listener' into stored-state X-Git-Tag: wip-DEPRECATED~13^2~2 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=bb0e25349bb87cf14f971694da4fa6c2031cb205;p=demos%2Fkafka%2Ftraining Merge branch 'rebalance-listener' into stored-state --- bb0e25349bb87cf14f971694da4fa6c2031cb205 diff --cc src/main/java/de/juplo/kafka/EndlessConsumer.java index e67bf41,14a875b..7cb77aa --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@@ -86,11 -83,10 +86,11 @@@ public class EndlessConsumer implement log.info( "{} - Seen {} messages for partition={}|key={}", id, - counter.getResult(), - removed.getPartition(), - counter.getKey()); + removed.get(key), + tp.partition(), + key); } - repository.save(new StatisticsDocument(removed)); ++ repository.save(new StatisticsDocument(tp.partition(), removed)); }); } @@@ -100,12 -96,7 +100,12 @@@ partitions.forEach(tp -> { log.info("{} - adding partition: {}", id, tp); - seen.put(tp.partition(), new HashMap<>()); + seen.put( - tp, ++ tp.partition(), + repository - .findById(tp.toString()) - .map(PartitionStatistics::new) - .orElse(new PartitionStatistics(tp))); ++ .findById(Integer.toString(tp.partition())) ++ .map(document -> document.statistics) ++ .orElse(new HashMap<>())); }); } }); diff --cc src/main/java/de/juplo/kafka/StatisticsDocument.java index 9318c4c,0000000..be998ca mode 100644,000000..100644 --- a/src/main/java/de/juplo/kafka/StatisticsDocument.java +++ b/src/main/java/de/juplo/kafka/StatisticsDocument.java @@@ -1,39 -1,0 +1,28 @@@ +package de.juplo.kafka; + +import lombok.ToString; +import org.springframework.data.annotation.Id; +import org.springframework.data.mongodb.core.mapping.Document; + +import java.util.HashMap; +import java.util.Map; + + +@Document(collection = "statistics") +@ToString +public class StatisticsDocument +{ + @Id + public String id; - public String topic; - public Integer partition; - public Map statistics; ++ public Map statistics; + + public StatisticsDocument() + { + } + - public StatisticsDocument(String topic, Integer partition, Map statistics) ++ public StatisticsDocument(Integer partition, Map statistics) + { - this.partition = partition; ++ this.id = Integer.toString(partition); + this.statistics = statistics; + } - - public StatisticsDocument(PartitionStatistics statistics) - { - this.topic = statistics.getPartition().topic(); - this.id = statistics.toString(); - this.partition = statistics.getPartition().partition(); - this.statistics = new HashMap<>(); - statistics.getStatistics().forEach(counter -> this.statistics.put(counter.getKey(), counter.getResult())); - } +}