From: Kai Moritz Date: Sun, 21 Aug 2022 08:33:09 +0000 (+0200) Subject: Benennung vereinheitlicht und projektunabhängig gemacht X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=refs%2Fheads%2Frebalance-listener;hp=61581ed5dfbb70f66390e7c3e9c261c6e6aa74d4;p=demos%2Fkafka%2Ftraining Benennung vereinheitlicht und projektunabhängig gemacht * Merge branch 'counting-consumer' into rebalance-listener * Außerdem die dort nicht vorhandene Klasse `KeyCountingRebalanceListener` entsprechend umbenannt. --- diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index bf00b6d..e9c26fd 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -22,10 +22,21 @@ public class ApplicationConfiguration return new ApplicationRecordHandler(); } + @Bean + public ApplicationRebalanceListener rebalanceListener( + ApplicationRecordHandler recordHandler, + ApplicationProperties properties) + { + return new ApplicationRebalanceListener( + recordHandler, + properties.getClientId()); + } + @Bean public EndlessConsumer endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, + ApplicationRebalanceListener rebalanceListener, ApplicationRecordHandler recordHandler, ApplicationProperties properties) { @@ -35,6 +46,7 @@ public class ApplicationConfiguration properties.getClientId(), properties.getTopic(), kafkaConsumer, + rebalanceListener, recordHandler); } diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java new file mode 100644 index 0000000..0dcadce --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -0,0 +1,50 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.common.TopicPartition; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + + +@RequiredArgsConstructor +@Slf4j +public class ApplicationRebalanceListener implements ConsumerRebalanceListener +{ + private final ApplicationRecordHandler recordHandler; + private final String id; + + @Override + public void onPartitionsAssigned(Collection partitions) + { + partitions.forEach(tp -> + { + Integer partition = tp.partition(); + log.info("{} - adding partition: {}", id, partition); + recordHandler.addPartition(partition, new HashMap<>()); + }); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions.forEach(tp -> + { + Integer partition = tp.partition(); + log.info("{} - removing partition: {}", id, partition); + Map removed = recordHandler.removePartition(partition); + for (String key : removed.keySet()) + { + log.info( + "{} - Seen {} messages for partition={}|key={}", + id, + removed.get(key), + partition, + key); + } + }); + } +} diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 3492c0d..dfbf82e 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -18,10 +18,6 @@ public class ApplicationRecordHandler implements RecordHandler { Integer partition = record.partition(); String key = record.key() == null ? "NULL" : record.key().toString(); - - if (!state.containsKey(partition)) - state.put(partition, new HashMap<>()); - Map byKey = state.get(partition); if (!byKey.containsKey(key)) @@ -32,6 +28,16 @@ public class ApplicationRecordHandler implements RecordHandler byKey.put(key, seenByKey); } + public void addPartition(Integer partition, Map statistics) + { + state.put(partition, statistics); + } + + public Map removePartition(Integer partition) + { + return state.remove(partition); + } + public Map> getState() { diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 63a2f93..c7579b8 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -25,6 +25,7 @@ public class EndlessConsumer implements Runnable private final String id; private final String topic; private final Consumer consumer; + private final ConsumerRebalanceListener consumerRebalanceListener; private final RecordHandler handler; private final Lock lock = new ReentrantLock(); @@ -41,7 +42,7 @@ public class EndlessConsumer implements Runnable try { log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); + consumer.subscribe(Arrays.asList(topic), consumerRebalanceListener); while (true) { diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index ffc0a0b..d7eb039 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -65,6 +65,8 @@ class ApplicationTests @Autowired ExecutorService executor; @Autowired + ApplicationRebalanceListener rebalanceListener; + @Autowired ApplicationRecordHandler recordHandler; EndlessConsumer endlessConsumer; @@ -285,6 +287,7 @@ class ApplicationTests properties.getClientId(), properties.getTopic(), kafkaConsumer, + rebalanceListener, captureOffsetAndExecuteTestHandler); endlessConsumer.start();