From: Kai Moritz Date: Sun, 21 Aug 2022 08:33:09 +0000 (+0200) Subject: Benennung vereinheitlicht und projektunabhängig gemacht X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=refs%2Fheads%2Frebalance-listener;p=demos%2Fkafka%2Ftraining Benennung vereinheitlicht und projektunabhängig gemacht * Merge branch 'counting-consumer' into rebalance-listener * Außerdem die dort nicht vorhandene Klasse `KeyCountingRebalanceListener` entsprechend umbenannt. --- 068273c0b63ae06af36a4ea35c0d4af654381ded diff --cc src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 7a0a8ad,bf00b6d..e9c26fd --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@@ -17,27 -17,16 +17,27 @@@ import java.util.concurrent.Executors public class ApplicationConfiguration { @Bean - public KeyCountingRecordHandler keyCountingRecordHandler() + public ApplicationRecordHandler recordHandler() { - return new KeyCountingRecordHandler(); + return new ApplicationRecordHandler(); } + @Bean - public KeyCountingRebalanceListener keyCountingRebalanceListener( - KeyCountingRecordHandler keyCountingRecordHandler, ++ public ApplicationRebalanceListener rebalanceListener( ++ ApplicationRecordHandler recordHandler, + ApplicationProperties properties) + { - return new KeyCountingRebalanceListener( - keyCountingRecordHandler, ++ return new ApplicationRebalanceListener( ++ recordHandler, + properties.getClientId()); + } + @Bean public EndlessConsumer endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, - KeyCountingRebalanceListener keyCountingRebalanceListener, - KeyCountingRecordHandler keyCountingRecordHandler, ++ ApplicationRebalanceListener rebalanceListener, + ApplicationRecordHandler recordHandler, ApplicationProperties properties) { return @@@ -46,8 -35,7 +46,8 @@@ properties.getClientId(), properties.getTopic(), kafkaConsumer, - keyCountingRebalanceListener, - keyCountingRecordHandler); ++ rebalanceListener, + recordHandler); } @Bean diff --cc src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index 0000000,0000000..0dcadce new file mode 100644 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@@ -1,0 -1,0 +1,50 @@@ ++package de.juplo.kafka; ++ ++import lombok.RequiredArgsConstructor; ++import lombok.extern.slf4j.Slf4j; ++import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; ++import org.apache.kafka.common.TopicPartition; ++ ++import java.util.Collection; ++import java.util.HashMap; ++import java.util.Map; ++ ++ ++@RequiredArgsConstructor ++@Slf4j ++public class ApplicationRebalanceListener implements ConsumerRebalanceListener ++{ ++ private final ApplicationRecordHandler recordHandler; ++ private final String id; ++ ++ @Override ++ public void onPartitionsAssigned(Collection partitions) ++ { ++ partitions.forEach(tp -> ++ { ++ Integer partition = tp.partition(); ++ log.info("{} - adding partition: {}", id, partition); ++ recordHandler.addPartition(partition, new HashMap<>()); ++ }); ++ } ++ ++ @Override ++ public void onPartitionsRevoked(Collection partitions) ++ { ++ partitions.forEach(tp -> ++ { ++ Integer partition = tp.partition(); ++ log.info("{} - removing partition: {}", id, partition); ++ Map removed = recordHandler.removePartition(partition); ++ for (String key : removed.keySet()) ++ { ++ log.info( ++ "{} - Seen {} messages for partition={}|key={}", ++ id, ++ removed.get(key), ++ partition, ++ key); ++ } ++ }); ++ } ++} diff --cc src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 0000000,3492c0d..dfbf82e mode 000000,100644..100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@@ -1,0 -1,40 +1,46 @@@ + package de.juplo.kafka; + + import lombok.extern.slf4j.Slf4j; + import org.apache.kafka.clients.consumer.ConsumerRecord; + + import java.util.HashMap; + import java.util.Map; + + + @Slf4j + public class ApplicationRecordHandler implements RecordHandler + { + private final Map> state = new HashMap<>(); + + + @Override + public void accept(ConsumerRecord record) + { + Integer partition = record.partition(); + String key = record.key() == null ? "NULL" : record.key().toString(); - - if (!state.containsKey(partition)) - state.put(partition, new HashMap<>()); - + Map byKey = state.get(partition); + + if (!byKey.containsKey(key)) + byKey.put(key, 0l); + + long seenByKey = byKey.get(key); + seenByKey++; + byKey.put(key, seenByKey); + } + ++ public void addPartition(Integer partition, Map statistics) ++ { ++ state.put(partition, statistics); ++ } ++ ++ public Map removePartition(Integer partition) ++ { ++ return state.remove(partition); ++ } ++ + + public Map> getState() + { + return state; + } + } diff --cc src/test/java/de/juplo/kafka/ApplicationTests.java index 5b13b7d,ffc0a0b..d7eb039 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@@ -65,9 -65,7 +65,9 @@@ class ApplicationTest @Autowired ExecutorService executor; @Autowired - KeyCountingRebalanceListener keyCountingRebalanceListener; ++ ApplicationRebalanceListener rebalanceListener; + @Autowired - KeyCountingRecordHandler keyCountingRecordHandler; + ApplicationRecordHandler recordHandler; EndlessConsumer endlessConsumer; Map oldOffsets; @@@ -287,7 -285,6 +287,7 @@@ properties.getClientId(), properties.getTopic(), kafkaConsumer, - keyCountingRebalanceListener, ++ rebalanceListener, captureOffsetAndExecuteTestHandler); endlessConsumer.start();