From 924fb13cf514f79d2b01206476c2dea8536c8d44 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 21 Aug 2022 13:06:19 +0200 Subject: [PATCH] Vorlage vereinfacht: Rebalance-Listener entfernt --- .../juplo/kafka/ApplicationConfiguration.java | 16 ------ .../kafka/ApplicationRebalanceListener.java | 51 ------------------- .../java/de/juplo/kafka/EndlessConsumer.java | 3 +- .../juplo/kafka/GenericApplicationTests.java | 3 -- 4 files changed, 1 insertion(+), 72 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 624a4ec..e887987 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -33,25 +33,10 @@ public class ApplicationConfiguration return new AdderResults(); } - @Bean - public ApplicationRebalanceListener rebalanceListener( - ApplicationRecordHandler recordHandler, - AdderResults adderResults, - StateRepository stateRepository, - ApplicationProperties properties) - { - return new ApplicationRebalanceListener( - recordHandler, - adderResults, - stateRepository, - properties.getClientId()); - } - @Bean public EndlessConsumer endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, - ApplicationRebalanceListener rebalanceListener, ApplicationRecordHandler recordHandler, ApplicationProperties properties) { @@ -61,7 +46,6 @@ public class ApplicationConfiguration properties.getClientId(), properties.getTopic(), kafkaConsumer, - rebalanceListener, recordHandler); } diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java deleted file mode 100644 index e214a14..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ /dev/null @@ -1,51 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.common.TopicPartition; - -import java.util.*; - - -@RequiredArgsConstructor -@Slf4j -public class ApplicationRebalanceListener implements ConsumerRebalanceListener -{ - private final ApplicationRecordHandler recordHandler; - private final AdderResults adderResults; - private final StateRepository stateRepository; - private final String id; - - private final Set partitions = new HashSet<>(); - - @Override - public void onPartitionsAssigned(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - log.info("{} - adding partition: {}", id, partition); - }); - } - - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - log.info("{} - removing partition: {}", id, partition); - Map state = recordHandler.getState(partition).getState(); - for (String user : state.keySet()) - { - log.info( - "{} - Saved state for partition={}|user={}: {}", - id, - partition, - user, - state.get(user)); - } - }); - } -} diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 00678c4..3e7310e 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -25,7 +25,6 @@ public class EndlessConsumer implements Runnable private final String id; private final String topic; private final Consumer consumer; - private final ConsumerRebalanceListener rebalanceListener; private final RecordHandler recordHandler; private final Lock lock = new ReentrantLock(); @@ -42,7 +41,7 @@ public class EndlessConsumer implements Runnable try { log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic), rebalanceListener); + consumer.subscribe(Arrays.asList(topic)); while (true) { diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 8849317..e63c5ce 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -67,8 +67,6 @@ abstract class GenericApplicationTests @Autowired MongoProperties mongoProperties; @Autowired - ConsumerRebalanceListener rebalanceListener; - @Autowired RecordHandler recordHandler; KafkaProducer testRecordProducer; @@ -376,7 +374,6 @@ abstract class GenericApplicationTests properties.getClientId(), properties.getTopic(), kafkaConsumer, - rebalanceListener, captureOffsetAndExecuteTestHandler); endlessConsumer.start(); -- 2.20.1