From: Kai Moritz Date: Sat, 13 Aug 2022 09:15:54 +0000 (+0200) Subject: Verbesserungen aus 'rebalance-listener' nach 'counting-consumer' gemerged X-Git-Tag: sumup-requests---lvm-2-tage~5^2 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=487ca1ac0c73b1c07547d5316d145ed46490ba9d;hp=de0468e4db973312e61ad4894edc092e84655161;p=demos%2Fkafka%2Ftraining Verbesserungen aus 'rebalance-listener' nach 'counting-consumer' gemerged --- diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 7a0a8ad..bb219d0 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -22,21 +22,10 @@ public class ApplicationConfiguration return new KeyCountingRecordHandler(); } - @Bean - public KeyCountingRebalanceListener keyCountingRebalanceListener( - KeyCountingRecordHandler keyCountingRecordHandler, - ApplicationProperties properties) - { - return new KeyCountingRebalanceListener( - keyCountingRecordHandler, - properties.getClientId()); - } - @Bean public EndlessConsumer endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, - KeyCountingRebalanceListener keyCountingRebalanceListener, KeyCountingRecordHandler keyCountingRecordHandler, ApplicationProperties properties) { @@ -46,7 +35,6 @@ public class ApplicationConfiguration properties.getClientId(), properties.getTopic(), kafkaConsumer, - keyCountingRebalanceListener, keyCountingRecordHandler); } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index c7579b8..63a2f93 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -25,7 +25,6 @@ public class EndlessConsumer implements Runnable private final String id; private final String topic; private final Consumer consumer; - private final ConsumerRebalanceListener consumerRebalanceListener; private final RecordHandler handler; private final Lock lock = new ReentrantLock(); @@ -42,7 +41,7 @@ public class EndlessConsumer implements Runnable try { log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic), consumerRebalanceListener); + consumer.subscribe(Arrays.asList(topic)); while (true) { diff --git a/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java b/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java deleted file mode 100644 index 0ad1f31..0000000 --- a/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java +++ /dev/null @@ -1,50 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.common.TopicPartition; - -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - - -@RequiredArgsConstructor -@Slf4j -public class KeyCountingRebalanceListener implements ConsumerRebalanceListener -{ - private final KeyCountingRecordHandler handler; - private final String id; - - @Override - public void onPartitionsAssigned(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - log.info("{} - adding partition: {}", id, partition); - handler.addPartition(partition, new HashMap<>()); - }); - } - - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - log.info("{} - removing partition: {}", id, partition); - Map removed = handler.removePartition(partition); - for (String key : removed.keySet()) - { - log.info( - "{} - Seen {} messages for partition={}|key={}", - id, - removed.get(key), - partition, - key); - } - }); - } -} diff --git a/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java b/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java index 099dcf7..83b3ff2 100644 --- a/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java +++ b/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java @@ -18,6 +18,10 @@ public class KeyCountingRecordHandler implements RecordHandler { Integer partition = record.partition(); String key = record.key() == null ? "NULL" : record.key().toString(); + + if (!seen.containsKey(partition)) + seen.put(partition, new HashMap<>()); + Map byKey = seen.get(partition); if (!byKey.containsKey(key)) @@ -28,16 +32,6 @@ public class KeyCountingRecordHandler implements RecordHandler byKey.put(key, seenByKey); } - public void addPartition(Integer partition, Map statistics) - { - seen.put(partition, statistics); - } - - public Map removePartition(Integer partition) - { - return seen.remove(partition); - } - public Map> getSeen() { diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 5b13b7d..0909f2c 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -65,8 +65,6 @@ class ApplicationTests @Autowired ExecutorService executor; @Autowired - KeyCountingRebalanceListener keyCountingRebalanceListener; - @Autowired KeyCountingRecordHandler keyCountingRecordHandler; EndlessConsumer endlessConsumer; @@ -287,7 +285,6 @@ class ApplicationTests properties.getClientId(), properties.getTopic(), kafkaConsumer, - keyCountingRebalanceListener, captureOffsetAndExecuteTestHandler); endlessConsumer.start();