From: Kai Moritz Date: Sat, 13 Aug 2022 13:15:43 +0000 (+0200) Subject: TMP X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=c808810e9e33afe33b29f7fd3921023ecd15483d;p=demos%2Fkafka%2Ftraining TMP --- diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index d48c027..3be8f95 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -18,20 +18,20 @@ import java.util.concurrent.Executors; public class ApplicationConfiguration { @Bean - public WordcountRecordHandler wordcountRecordHandler() + public SumRecordHandler sumRecordHandler() { - return new WordcountRecordHandler(); + return new SumRecordHandler(); } @Bean - public WordcountRebalanceListener wordcountRebalanceListener( - WordcountRecordHandler wordcountRecordHandler, + public SumRebalanceListener sumRebalanceListener( + SumRecordHandler sumRecordHandler, PartitionStatisticsRepository repository, Consumer consumer, ApplicationProperties properties) { - return new WordcountRebalanceListener( - wordcountRecordHandler, + return new SumRebalanceListener( + sumRecordHandler, repository, properties.getClientId(), properties.getTopic(), @@ -44,8 +44,8 @@ public class ApplicationConfiguration public EndlessConsumer endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, - WordcountRebalanceListener wordcountRebalanceListener, - WordcountRecordHandler wordcountRecordHandler, + SumRebalanceListener sumRebalanceListener, + SumRecordHandler sumRecordHandler, ApplicationProperties properties) { return @@ -54,8 +54,8 @@ public class ApplicationConfiguration properties.getClientId(), properties.getTopic(), kafkaConsumer, - wordcountRebalanceListener, - wordcountRecordHandler); + sumRebalanceListener, + sumRecordHandler); } @Bean diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index 5d6c1a8..5a09c1b 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -14,7 +14,7 @@ import java.util.concurrent.ExecutionException; public class DriverController { private final EndlessConsumer consumer; - private final WordcountRecordHandler wordcount; + private final SumRecordHandler wordcount; @PostMapping("start") diff --git a/src/main/java/de/juplo/kafka/SumRebalanceListener.java b/src/main/java/de/juplo/kafka/SumRebalanceListener.java new file mode 100644 index 0000000..1cd738f --- /dev/null +++ b/src/main/java/de/juplo/kafka/SumRebalanceListener.java @@ -0,0 +1,83 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.TopicPartition; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.Map; + + +@RequiredArgsConstructor +@Slf4j +public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceListener +{ + private final SumRecordHandler handler; + private final PartitionStatisticsRepository repository; + private final String id; + private final String topic; + private final Clock clock; + private final Duration commitInterval; + private final Consumer consumer; + + private Instant lastCommit = Instant.EPOCH; + + @Override + public void onPartitionsAssigned(Collection partitions) + { + partitions.forEach(tp -> + { + Integer partition = tp.partition(); + Long offset = consumer.position(tp); + log.info("{} - adding partition: {}, offset={}", id, partition, offset); + StatisticsDocument document = + repository + .findById(Integer.toString(partition)) + .orElse(new StatisticsDocument(partition)); + if (document.offset >= 0) + { + // Only seek, if a stored offset was found + // Otherwise: Use initial offset, generated by Kafka + consumer.seek(tp, document.offset); + } + handler.addPartition(partition, document.statistics); + }); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions.forEach(tp -> + { + Integer partition = tp.partition(); + Long newOffset = consumer.position(tp); + log.info( + "{} - removing partition: {}, offset of next message {})", + id, + partition, + newOffset); + Map> removed = handler.removePartition(partition); + repository.save(new StatisticsDocument(partition, removed, consumer.position(tp))); + }); + } + + + @Override + public void beforeNextPoll() + { + if (lastCommit.plus(commitInterval).isBefore(clock.instant())) + { + log.debug("Storing data and offsets, last commit: {}", lastCommit); + handler.getSeen().forEach((partiton, statistics) -> repository.save( + new StatisticsDocument( + partiton, + statistics, + consumer.position(new TopicPartition(topic, partiton))))); + lastCommit = clock.instant(); + } + } +} diff --git a/src/main/java/de/juplo/kafka/SumRecordHandler.java b/src/main/java/de/juplo/kafka/SumRecordHandler.java new file mode 100644 index 0000000..82ada38 --- /dev/null +++ b/src/main/java/de/juplo/kafka/SumRecordHandler.java @@ -0,0 +1,64 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Pattern; + + +@Slf4j +public class SumRecordHandler implements RecordHandler +{ + final static Pattern PATTERN = Pattern.compile("\\W+"); + + + private final Map>> seen = new HashMap<>(); + + + @Override + public void accept(ConsumerRecord record) + { + Integer partition = record.partition(); + String user = record.key(); + Map> users = seen.get(partition); + + Map words = users.get(user); + if (words == null) + { + words = new HashMap<>(); + users.put(user, words); + } + + for (String word : PATTERN.split(record.value())) + { + Long num = words.get(word); + if (num == null) + { + num = 1l; + } + else + { + num++; + } + words.put(word, num); + } + } + + public void addPartition(Integer partition, Map> statistics) + { + seen.put(partition, statistics); + } + + public Map> removePartition(Integer partition) + { + return seen.remove(partition); + } + + + public Map>> getSeen() + { + return seen; + } +} diff --git a/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java b/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java deleted file mode 100644 index 9f2fc0f..0000000 --- a/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java +++ /dev/null @@ -1,83 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.common.TopicPartition; - -import java.time.Clock; -import java.time.Duration; -import java.time.Instant; -import java.util.Collection; -import java.util.Map; - - -@RequiredArgsConstructor -@Slf4j -public class WordcountRebalanceListener implements PollIntervalAwareConsumerRebalanceListener -{ - private final WordcountRecordHandler handler; - private final PartitionStatisticsRepository repository; - private final String id; - private final String topic; - private final Clock clock; - private final Duration commitInterval; - private final Consumer consumer; - - private Instant lastCommit = Instant.EPOCH; - - @Override - public void onPartitionsAssigned(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - Long offset = consumer.position(tp); - log.info("{} - adding partition: {}, offset={}", id, partition, offset); - StatisticsDocument document = - repository - .findById(Integer.toString(partition)) - .orElse(new StatisticsDocument(partition)); - if (document.offset >= 0) - { - // Only seek, if a stored offset was found - // Otherwise: Use initial offset, generated by Kafka - consumer.seek(tp, document.offset); - } - handler.addPartition(partition, document.statistics); - }); - } - - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - Long newOffset = consumer.position(tp); - log.info( - "{} - removing partition: {}, offset of next message {})", - id, - partition, - newOffset); - Map> removed = handler.removePartition(partition); - repository.save(new StatisticsDocument(partition, removed, consumer.position(tp))); - }); - } - - - @Override - public void beforeNextPoll() - { - if (lastCommit.plus(commitInterval).isBefore(clock.instant())) - { - log.debug("Storing data and offsets, last commit: {}", lastCommit); - handler.getSeen().forEach((partiton, statistics) -> repository.save( - new StatisticsDocument( - partiton, - statistics, - consumer.position(new TopicPartition(topic, partiton))))); - lastCommit = clock.instant(); - } - } -} diff --git a/src/main/java/de/juplo/kafka/WordcountRecordHandler.java b/src/main/java/de/juplo/kafka/WordcountRecordHandler.java deleted file mode 100644 index 4efc547..0000000 --- a/src/main/java/de/juplo/kafka/WordcountRecordHandler.java +++ /dev/null @@ -1,64 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; - -import java.util.HashMap; -import java.util.Map; -import java.util.regex.Pattern; - - -@Slf4j -public class WordcountRecordHandler implements RecordHandler -{ - final static Pattern PATTERN = Pattern.compile("\\W+"); - - - private final Map>> seen = new HashMap<>(); - - - @Override - public void accept(ConsumerRecord record) - { - Integer partition = record.partition(); - String user = record.key(); - Map> users = seen.get(partition); - - Map words = users.get(user); - if (words == null) - { - words = new HashMap<>(); - users.put(user, words); - } - - for (String word : PATTERN.split(record.value())) - { - Long num = words.get(word); - if (num == null) - { - num = 1l; - } - else - { - num++; - } - words.put(word, num); - } - } - - public void addPartition(Integer partition, Map> statistics) - { - seen.put(partition, statistics); - } - - public Map> removePartition(Integer partition) - { - return seen.remove(partition); - } - - - public Map>> getSeen() - { - return seen; - } -} diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index aa3dfd6..09614b8 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -71,9 +71,9 @@ class ApplicationTests @Autowired PartitionStatisticsRepository repository; @Autowired - WordcountRebalanceListener wordcountRebalanceListener; + SumRebalanceListener sumRebalanceListener; @Autowired - WordcountRecordHandler wordcountRecordHandler; + SumRecordHandler sumRecordHandler; EndlessConsumer endlessConsumer; Map oldOffsets; @@ -243,7 +243,7 @@ class ApplicationTests }); TestRecordHandler captureOffsetAndExecuteTestHandler = - new TestRecordHandler(wordcountRecordHandler) { + new TestRecordHandler(sumRecordHandler) { @Override public void onNewRecord(ConsumerRecord record) { @@ -260,7 +260,7 @@ class ApplicationTests properties.getClientId(), properties.getTopic(), kafkaConsumer, - wordcountRebalanceListener, + sumRebalanceListener, captureOffsetAndExecuteTestHandler); endlessConsumer.start();