From fc682d9890787ef363b3e189f6f880a043f3c541 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 12 Aug 2022 11:53:46 +0200 Subject: [PATCH] refactor: Handling der Partitionen in WordcountRebalanceListener --- .../juplo/kafka/ApplicationConfiguration.java | 12 ++++-- .../java/de/juplo/kafka/RecordHandler.java | 5 --- .../kafka/WordcountRebalanceListener.java | 40 +++++++++++++++++-- .../juplo/kafka/WordcountRecordHandler.java | 32 ++------------- .../de/juplo/kafka/TestRecordHandler.java | 13 ------ 5 files changed, 50 insertions(+), 52 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index da1605b..0d17823 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -25,7 +25,6 @@ public class ApplicationConfiguration { return new WordcountRecordHandler( repository, - properties.getClientId(), properties.getTopic(), Clock.systemDefaultZone(), properties.getCommitInterval(), @@ -34,9 +33,16 @@ public class ApplicationConfiguration @Bean public WordcountRebalanceListener wordcountRebalanceListener( - WordcountRecordHandler wordcountRecordHandler) + WordcountRecordHandler wordcountRecordHandler, + PartitionStatisticsRepository repository, + Consumer consumer, + ApplicationProperties properties) { - return new WordcountRebalanceListener(wordcountRecordHandler); + return new WordcountRebalanceListener( + wordcountRecordHandler, + repository, + properties.getClientId(), + consumer); } @Bean diff --git a/src/main/java/de/juplo/kafka/RecordHandler.java b/src/main/java/de/juplo/kafka/RecordHandler.java index ff2f193..3c9dd15 100644 --- a/src/main/java/de/juplo/kafka/RecordHandler.java +++ b/src/main/java/de/juplo/kafka/RecordHandler.java @@ -1,7 +1,6 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.TopicPartition; import java.util.function.Consumer; @@ -9,8 +8,4 @@ import java.util.function.Consumer; public interface RecordHandler extends Consumer> { default void beforeNextPoll() {} - - default void onPartitionAssigned(TopicPartition tp) {} - - default void onPartitionRevoked(TopicPartition tp) {} } diff --git a/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java b/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java index fd551c2..9a69c8f 100644 --- a/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java @@ -1,27 +1,61 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; import java.util.Collection; +import java.util.Map; @RequiredArgsConstructor +@Slf4j public class WordcountRebalanceListener implements ConsumerRebalanceListener { - private final RecordHandler handler; + private final WordcountRecordHandler handler; + private final PartitionStatisticsRepository repository; + private final String id; + private final Consumer consumer; @Override public void onPartitionsAssigned(Collection partitions) { - partitions.forEach(tp -> handler.onPartitionAssigned(tp)); + partitions.forEach(tp -> + { + Integer partition = tp.partition(); + Long offset = consumer.position(tp); + log.info("{} - adding partition: {}, offset={}", id, partition, offset); + StatisticsDocument document = + repository + .findById(Integer.toString(partition)) + .orElse(new StatisticsDocument(partition)); + if (document.offset >= 0) + { + // Only seek, if a stored offset was found + // Otherwise: Use initial offset, generated by Kafka + consumer.seek(tp, document.offset); + } + handler.addPartition(partition, document.statistics); + }); } @Override public void onPartitionsRevoked(Collection partitions) { - partitions.forEach(tp -> handler.onPartitionRevoked(tp)); + partitions.forEach(tp -> + { + Integer partition = tp.partition(); + Long newOffset = consumer.position(tp); + log.info( + "{} - removing partition: {}, offset of next message {})", + id, + partition, + newOffset); + Map> removed = handler.removePartition(partition); + repository.save(new StatisticsDocument(partition, removed, consumer.position(tp))); + }); } } diff --git a/src/main/java/de/juplo/kafka/WordcountRecordHandler.java b/src/main/java/de/juplo/kafka/WordcountRecordHandler.java index 5981c7d..bdf4b32 100644 --- a/src/main/java/de/juplo/kafka/WordcountRecordHandler.java +++ b/src/main/java/de/juplo/kafka/WordcountRecordHandler.java @@ -22,7 +22,6 @@ public class WordcountRecordHandler implements RecordHandler private final PartitionStatisticsRepository repository; - private final String id; private final String topic; private final Clock clock; private final Duration commitInterval; @@ -78,37 +77,14 @@ public class WordcountRecordHandler implements RecordHandler } } - @Override - public void onPartitionAssigned(TopicPartition tp) + public void addPartition(Integer partition, Map> statistics) { - Integer partition = tp.partition(); - Long offset = consumer.position(tp); - log.info("{} - adding partition: {}, offset={}", id, partition, offset); - StatisticsDocument document = - repository - .findById(Integer.toString(partition)) - .orElse(new StatisticsDocument(partition)); - if (document.offset >= 0) - { - // Only seek, if a stored offset was found - // Otherwise: Use initial offset, generated by Kafka - consumer.seek(tp, document.offset); - } - seen.put(partition, document.statistics); + seen.put(partition, statistics); } - @Override - public void onPartitionRevoked(TopicPartition tp) + public Map> removePartition(Integer partition) { - Integer partition = tp.partition(); - Long newOffset = consumer.position(tp); - log.info( - "{} - removing partition: {}, offset of next message {})", - id, - partition, - newOffset); - Map> removed = seen.remove(partition); - repository.save(new StatisticsDocument(partition, removed, consumer.position(tp))); + return seen.remove(partition); } diff --git a/src/test/java/de/juplo/kafka/TestRecordHandler.java b/src/test/java/de/juplo/kafka/TestRecordHandler.java index 4047093..de28385 100644 --- a/src/test/java/de/juplo/kafka/TestRecordHandler.java +++ b/src/test/java/de/juplo/kafka/TestRecordHandler.java @@ -2,7 +2,6 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.TopicPartition; @RequiredArgsConstructor @@ -26,16 +25,4 @@ public abstract class TestRecordHandler implements RecordHandler { handler.beforeNextPoll(); } - - @Override - public void onPartitionAssigned(TopicPartition tp) - { - handler.onPartitionAssigned(tp); - } - - @Override - public void onPartitionRevoked(TopicPartition tp) - { - handler.onPartitionRevoked(tp); - } } -- 2.20.1