From 818c1eb862247e25abf9f7d91d5a73e3e3789a39 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 12 Aug 2022 11:13:54 +0200 Subject: [PATCH] =?utf8?q?refactor:=20RebalanceListener=20als=20eigenst?= =?utf8?q?=C3=A4ndige=20Klasse?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../juplo/kafka/ApplicationConfiguration.java | 9 +++++++ .../java/de/juplo/kafka/EndlessConsumer.java | 17 +++--------- .../kafka/WordcountRebalanceListener.java | 27 +++++++++++++++++++ .../java/de/juplo/kafka/ApplicationTests.java | 5 +++- 4 files changed, 43 insertions(+), 15 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/WordcountRebalanceListener.java diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index b077a90..da1605b 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -32,10 +32,18 @@ public class ApplicationConfiguration consumer); } + @Bean + public WordcountRebalanceListener wordcountRebalanceListener( + WordcountRecordHandler wordcountRecordHandler) + { + return new WordcountRebalanceListener(wordcountRecordHandler); + } + @Bean public EndlessConsumer endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, + WordcountRebalanceListener wordcountRebalanceListener, WordcountRecordHandler wordcountRecordHandler, ApplicationProperties properties) { @@ -45,6 +53,7 @@ public class ApplicationConfiguration properties.getClientId(), properties.getTopic(), kafkaConsumer, + wordcountRebalanceListener, wordcountRecordHandler); } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 0c107f3..0f3316d 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -19,12 +19,13 @@ import java.util.concurrent.locks.ReentrantLock; @Slf4j @RequiredArgsConstructor -public class EndlessConsumer implements ConsumerRebalanceListener, Runnable +public class EndlessConsumer implements Runnable { private final ExecutorService executor; private final String id; private final String topic; private final Consumer consumer; + private final ConsumerRebalanceListener rebalanceListener; private final RecordHandler handler; private final Lock lock = new ReentrantLock(); @@ -34,18 +35,6 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl private long consumed = 0; - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions.forEach(tp -> handler.onPartitionRevoked(tp)); - } - - @Override - public void onPartitionsAssigned(Collection partitions) - { - partitions.forEach(tp -> handler.onPartitionAssigned(tp)); - } - @Override public void run() @@ -53,7 +42,7 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl try { log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic), this); + consumer.subscribe(Arrays.asList(topic), rebalanceListener); while (true) { diff --git a/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java b/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java new file mode 100644 index 0000000..fd551c2 --- /dev/null +++ b/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java @@ -0,0 +1,27 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.common.TopicPartition; + +import java.util.Collection; + + +@RequiredArgsConstructor +public class WordcountRebalanceListener implements ConsumerRebalanceListener +{ + private final RecordHandler handler; + + + @Override + public void onPartitionsAssigned(Collection partitions) + { + partitions.forEach(tp -> handler.onPartitionAssigned(tp)); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions.forEach(tp -> handler.onPartitionRevoked(tp)); + } +} diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 408a826..f4c2104 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -70,7 +70,9 @@ class ApplicationTests ExecutorService executor; @Autowired PartitionStatisticsRepository repository; - @Autowired + @Autowired + WordcountRebalanceListener wordcountRebalanceListener; + @Autowired WordcountRecordHandler wordcountRecordHandler; EndlessConsumer endlessConsumer; @@ -233,6 +235,7 @@ class ApplicationTests properties.getClientId(), properties.getTopic(), kafkaConsumer, + wordcountRebalanceListener, captureOffsetAndExecuteTestHandler); endlessConsumer.start(); -- 2.20.1