From: Kai Moritz Date: Fri, 12 Aug 2022 10:04:27 +0000 (+0200) Subject: refactor: Alle Kafka-Belange in den `WordcountRebalanceListener` verschoben X-Git-Tag: endless-stream-consumer-DEPRECATED^2^2^2~1^2~4^2 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=9511a89368c96d0b5f09d55adaaed5515c578dcc;p=demos%2Fkafka%2Ftraining refactor: Alle Kafka-Belange in den `WordcountRebalanceListener` verschoben * Dafür neues Interface `PollIntervalAwareRebalanceListener` eingeführt. * `WordcountRebalanceListener` implementiert das neue Interface und kümmert sich um alle Kafka-Belange. * `WordcountRecordHandler` kümmert sich nur noch um die Fachlogik. --- diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 0d17823..d48c027 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -18,17 +18,9 @@ import java.util.concurrent.Executors; public class ApplicationConfiguration { @Bean - public WordcountRecordHandler wordcountRecordHandler( - PartitionStatisticsRepository repository, - Consumer consumer, - ApplicationProperties properties) + public WordcountRecordHandler wordcountRecordHandler() { - return new WordcountRecordHandler( - repository, - properties.getTopic(), - Clock.systemDefaultZone(), - properties.getCommitInterval(), - consumer); + return new WordcountRecordHandler(); } @Bean @@ -42,6 +34,9 @@ public class ApplicationConfiguration wordcountRecordHandler, repository, properties.getClientId(), + properties.getTopic(), + Clock.systemDefaultZone(), + properties.getCommitInterval(), consumer); } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 0f3316d..58557f2 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -25,7 +25,7 @@ public class EndlessConsumer implements Runnable private final String id; private final String topic; private final Consumer consumer; - private final ConsumerRebalanceListener rebalanceListener; + private final PollIntervalAwareConsumerRebalanceListener pollIntervalAwareRebalanceListener; private final RecordHandler handler; private final Lock lock = new ReentrantLock(); @@ -42,7 +42,7 @@ public class EndlessConsumer implements Runnable try { log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic), rebalanceListener); + consumer.subscribe(Arrays.asList(topic), pollIntervalAwareRebalanceListener); while (true) { @@ -68,7 +68,7 @@ public class EndlessConsumer implements Runnable consumed++; } - handler.beforeNextPoll(); + pollIntervalAwareRebalanceListener.beforeNextPoll(); } } catch(WakeupException e) diff --git a/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java b/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java new file mode 100644 index 0000000..8abec12 --- /dev/null +++ b/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java @@ -0,0 +1,9 @@ +package de.juplo.kafka; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; + + +public interface PollIntervalAwareConsumerRebalanceListener extends ConsumerRebalanceListener +{ + default void beforeNextPoll() {} +} diff --git a/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java b/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java index 9a69c8f..9f2fc0f 100644 --- a/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java @@ -3,22 +3,28 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; import java.util.Collection; import java.util.Map; @RequiredArgsConstructor @Slf4j -public class WordcountRebalanceListener implements ConsumerRebalanceListener +public class WordcountRebalanceListener implements PollIntervalAwareConsumerRebalanceListener { private final WordcountRecordHandler handler; private final PartitionStatisticsRepository repository; private final String id; + private final String topic; + private final Clock clock; + private final Duration commitInterval; private final Consumer consumer; + private Instant lastCommit = Instant.EPOCH; @Override public void onPartitionsAssigned(Collection partitions) @@ -58,4 +64,20 @@ public class WordcountRebalanceListener implements ConsumerRebalanceListener repository.save(new StatisticsDocument(partition, removed, consumer.position(tp))); }); } + + + @Override + public void beforeNextPoll() + { + if (lastCommit.plus(commitInterval).isBefore(clock.instant())) + { + log.debug("Storing data and offsets, last commit: {}", lastCommit); + handler.getSeen().forEach((partiton, statistics) -> repository.save( + new StatisticsDocument( + partiton, + statistics, + consumer.position(new TopicPartition(topic, partiton))))); + lastCommit = clock.instant(); + } + } } diff --git a/src/main/java/de/juplo/kafka/WordcountRecordHandler.java b/src/main/java/de/juplo/kafka/WordcountRecordHandler.java index bdf4b32..4efc547 100644 --- a/src/main/java/de/juplo/kafka/WordcountRecordHandler.java +++ b/src/main/java/de/juplo/kafka/WordcountRecordHandler.java @@ -1,36 +1,21 @@ package de.juplo.kafka; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.TopicPartition; -import java.time.Clock; -import java.time.Duration; -import java.time.Instant; import java.util.HashMap; import java.util.Map; import java.util.regex.Pattern; -@RequiredArgsConstructor @Slf4j public class WordcountRecordHandler implements RecordHandler { final static Pattern PATTERN = Pattern.compile("\\W+"); - private final PartitionStatisticsRepository repository; - private final String topic; - private final Clock clock; - private final Duration commitInterval; - private final Consumer consumer; - private final Map>> seen = new HashMap<>(); - private Instant lastCommit = Instant.EPOCH; - @Override public void accept(ConsumerRecord record) @@ -61,22 +46,6 @@ public class WordcountRecordHandler implements RecordHandler } } - - @Override - public void beforeNextPoll() - { - if (lastCommit.plus(commitInterval).isBefore(clock.instant())) - { - log.debug("Storing data and offsets, last commit: {}", lastCommit); - seen.forEach((partiton, statistics) -> repository.save( - new StatisticsDocument( - partiton, - statistics, - consumer.position(new TopicPartition(topic, partiton))))); - lastCommit = clock.instant(); - } - } - public void addPartition(Integer partition, Map> statistics) { seen.put(partition, statistics);