From: Kai Moritz Date: Sun, 21 Aug 2022 11:06:19 +0000 (+0200) Subject: Vorlage vereinfacht: Rebalance-Listener entfernt X-Git-Tag: sumup-adder--vorlage---lvm-2-tage~1 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=924fb13cf514f79d2b01206476c2dea8536c8d44;p=demos%2Fkafka%2Ftraining Vorlage vereinfacht: Rebalance-Listener entfernt --- diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 624a4ec..e887987 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -33,25 +33,10 @@ public class ApplicationConfiguration return new AdderResults(); } - @Bean - public ApplicationRebalanceListener rebalanceListener( - ApplicationRecordHandler recordHandler, - AdderResults adderResults, - StateRepository stateRepository, - ApplicationProperties properties) - { - return new ApplicationRebalanceListener( - recordHandler, - adderResults, - stateRepository, - properties.getClientId()); - } - @Bean public EndlessConsumer endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, - ApplicationRebalanceListener rebalanceListener, ApplicationRecordHandler recordHandler, ApplicationProperties properties) { @@ -61,7 +46,6 @@ public class ApplicationConfiguration properties.getClientId(), properties.getTopic(), kafkaConsumer, - rebalanceListener, recordHandler); } diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java deleted file mode 100644 index e214a14..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ /dev/null @@ -1,51 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.common.TopicPartition; - -import java.util.*; - - -@RequiredArgsConstructor -@Slf4j -public class ApplicationRebalanceListener implements ConsumerRebalanceListener -{ - private final ApplicationRecordHandler recordHandler; - private final AdderResults adderResults; - private final StateRepository stateRepository; - private final String id; - - private final Set partitions = new HashSet<>(); - - @Override - public void onPartitionsAssigned(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - log.info("{} - adding partition: {}", id, partition); - }); - } - - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - log.info("{} - removing partition: {}", id, partition); - Map state = recordHandler.getState(partition).getState(); - for (String user : state.keySet()) - { - log.info( - "{} - Saved state for partition={}|user={}: {}", - id, - partition, - user, - state.get(user)); - } - }); - } -} diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 00678c4..3e7310e 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -25,7 +25,6 @@ public class EndlessConsumer implements Runnable private final String id; private final String topic; private final Consumer consumer; - private final ConsumerRebalanceListener rebalanceListener; private final RecordHandler recordHandler; private final Lock lock = new ReentrantLock(); @@ -42,7 +41,7 @@ public class EndlessConsumer implements Runnable try { log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic), rebalanceListener); + consumer.subscribe(Arrays.asList(topic)); while (true) { diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 8849317..e63c5ce 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -67,8 +67,6 @@ abstract class GenericApplicationTests @Autowired MongoProperties mongoProperties; @Autowired - ConsumerRebalanceListener rebalanceListener; - @Autowired RecordHandler recordHandler; KafkaProducer testRecordProducer; @@ -376,7 +374,6 @@ abstract class GenericApplicationTests properties.getClientId(), properties.getTopic(), kafkaConsumer, - rebalanceListener, captureOffsetAndExecuteTestHandler); endlessConsumer.start();