From: Kai Moritz Date: Thu, 7 Apr 2022 23:13:01 +0000 (+0200) Subject: Merge branch 'rebalance-listener' into stored-state X-Git-Tag: sumup-requests---lvm-2-tage~5^2^2^2~7 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=bb0e25349bb87cf14f971694da4fa6c2031cb205;hp=-c;p=demos%2Fkafka%2Ftraining Merge branch 'rebalance-listener' into stored-state --- bb0e25349bb87cf14f971694da4fa6c2031cb205 diff --combined src/main/java/de/juplo/kafka/Application.java index 23c845a,dd4b20a..bcbf418 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@@ -5,7 -5,6 +5,6 @@@ import org.springframework.boot.SpringA import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; - import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; import org.springframework.util.Assert; import java.util.concurrent.Executors; @@@ -20,7 -19,7 +19,7 @@@ public class Applicatio @Bean - public EndlessConsumer consumer() + public EndlessConsumer consumer(PartitionStatisticsRepository repository) { Assert.hasText(properties.getBootstrapServer(), "consumer.bootstrap-server must be set"); Assert.hasText(properties.getGroupId(), "consumer.group-id must be set"); @@@ -30,7 -29,6 +29,7 @@@ EndlessConsumer consumer = new EndlessConsumer( Executors.newFixedThreadPool(1), + repository, properties.getBootstrapServer(), properties.getGroupId(), properties.getClientId(), @@@ -42,16 -40,6 +41,6 @@@ return consumer; } - @Bean - public Jackson2ObjectMapperBuilder jackson2ObjectMapperBuilder() - { - return - new Jackson2ObjectMapperBuilder().serializers( - new TopicPartitionSerializer(), - new PartitionStatisticsSerializer()); - } - - public static void main(String[] args) { SpringApplication.run(Application.class, args); diff --combined src/main/java/de/juplo/kafka/EndlessConsumer.java index e67bf41,14a875b..7cb77aa --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@@ -22,7 -22,6 +22,7 @@@ import java.util.concurrent.atomic.Atom public class EndlessConsumer implements Runnable { private final ExecutorService executor; + private final PartitionStatisticsRepository repository; private final String bootstrapServer; private final String groupId; private final String id; @@@ -34,12 -33,11 +34,12 @@@ private KafkaConsumer consumer = null; private Future future = null; - private final Map seen = new HashMap<>(); + private final Map> seen = new HashMap<>(); public EndlessConsumer( ExecutorService executor, + PartitionStatisticsRepository repository, String bootstrapServer, String groupId, String clientId, @@@ -47,7 -45,6 +47,7 @@@ String autoOffsetReset) { this.executor = executor; + this.repository = repository; this.bootstrapServer = bootstrapServer; this.groupId = groupId; this.id = clientId; @@@ -80,17 -77,16 +80,17 @@@ partitions.forEach(tp -> { log.info("{} - removing partition: {}", id, tp); - PartitionStatistics removed = seen.remove(tp); - for (KeyCounter counter : removed.getStatistics()) + Map removed = seen.remove(tp.partition()); + for (String key : removed.keySet()) { log.info( "{} - Seen {} messages for partition={}|key={}", id, - counter.getResult(), - removed.getPartition(), - counter.getKey()); + removed.get(key), + tp.partition(), + key); } - repository.save(new StatisticsDocument(removed)); ++ repository.save(new StatisticsDocument(tp.partition(), removed)); }); } @@@ -100,12 -96,7 +100,12 @@@ partitions.forEach(tp -> { log.info("{} - adding partition: {}", id, tp); - seen.put(tp.partition(), new HashMap<>()); + seen.put( - tp, ++ tp.partition(), + repository - .findById(tp.toString()) - .map(PartitionStatistics::new) - .orElse(new PartitionStatistics(tp))); ++ .findById(Integer.toString(tp.partition())) ++ .map(document -> document.statistics) ++ .orElse(new HashMap<>())); }); } }); @@@ -130,9 -121,16 +130,16 @@@ record.value() ); - TopicPartition partition = new TopicPartition(record.topic(), record.partition()); + Integer partition = record.partition(); String key = record.key() == null ? "NULL" : record.key(); - seen.get(partition).increment(key); + Map byKey = seen.get(partition); + + if (!byKey.containsKey(key)) + byKey.put(key, 0); + + int seenByKey = byKey.get(key); + seenByKey++; + byKey.put(key, seenByKey); } } } @@@ -153,7 -151,7 +160,7 @@@ } } - public Map getSeen() + public Map> getSeen() { return seen; } diff --combined src/main/java/de/juplo/kafka/StatisticsDocument.java index 9318c4c,0000000..be998ca mode 100644,000000..100644 --- a/src/main/java/de/juplo/kafka/StatisticsDocument.java +++ b/src/main/java/de/juplo/kafka/StatisticsDocument.java @@@ -1,39 -1,0 +1,28 @@@ +package de.juplo.kafka; + +import lombok.ToString; +import org.springframework.data.annotation.Id; +import org.springframework.data.mongodb.core.mapping.Document; + +import java.util.HashMap; +import java.util.Map; + + +@Document(collection = "statistics") +@ToString +public class StatisticsDocument +{ + @Id + public String id; - public String topic; - public Integer partition; - public Map statistics; ++ public Map statistics; + + public StatisticsDocument() + { + } + - public StatisticsDocument(String topic, Integer partition, Map statistics) ++ public StatisticsDocument(Integer partition, Map statistics) + { - this.partition = partition; ++ this.id = Integer.toString(partition); + this.statistics = statistics; + } - - public StatisticsDocument(PartitionStatistics statistics) - { - this.topic = statistics.getPartition().topic(); - this.id = statistics.toString(); - this.partition = statistics.getPartition().partition(); - this.statistics = new HashMap<>(); - statistics.getStatistics().forEach(counter -> this.statistics.put(counter.getKey(), counter.getResult())); - } +}