From: Kai Moritz Date: Fri, 12 Aug 2022 15:32:24 +0000 (+0200) Subject: Refaktorisierungen aus 'wordcount' nach 'stored-offsets' zurück portiert X-Git-Tag: endless-stream-consumer-DEPRECATED^2^2^2~1^2~4 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=b3777fba0ae679d9e2c9d36626fa208a952f83e8;p=demos%2Fkafka%2Ftraining Refaktorisierungen aus 'wordcount' nach 'stored-offsets' zurück portiert --- b3777fba0ae679d9e2c9d36626fa208a952f83e8 diff --cc src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 9b06b09,d48c027..8e2e867 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@@ -1,8 -1,7 +1,8 @@@ package de.juplo.kafka; - import org.apache.kafka.clients.consumer.ConsumerRecord; + import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; @@@ -20,32 -18,44 +19,44 @@@ import java.util.concurrent.Executors public class ApplicationConfiguration { @Bean - public Consumer> consumer() - public WordcountRecordHandler wordcountRecordHandler() ++ public KeyCountingRecordHandler messageCountingRecordHandler() { - return (record) -> - { - // Handle record - }; - return new WordcountRecordHandler(); ++ return new KeyCountingRecordHandler(); + } + + @Bean - public WordcountRebalanceListener wordcountRebalanceListener( - WordcountRecordHandler wordcountRecordHandler, ++ public KeyCountingRebalanceListener wordcountRebalanceListener( ++ KeyCountingRecordHandler keyCountingRecordHandler, + PartitionStatisticsRepository repository, - Consumer consumer, ++ Consumer consumer, + ApplicationProperties properties) + { - return new WordcountRebalanceListener( - wordcountRecordHandler, ++ return new KeyCountingRebalanceListener( ++ keyCountingRecordHandler, + repository, + properties.getClientId(), + properties.getTopic(), + Clock.systemDefaultZone(), + properties.getCommitInterval(), + consumer); } @Bean - public EndlessConsumer endlessConsumer( - KafkaConsumer kafkaConsumer, + public EndlessConsumer endlessConsumer( + KafkaConsumer kafkaConsumer, ExecutorService executor, - Consumer> handler, - PartitionStatisticsRepository repository, - WordcountRebalanceListener wordcountRebalanceListener, - WordcountRecordHandler wordcountRecordHandler, ++ KeyCountingRebalanceListener keyCountingRebalanceListener, ++ KeyCountingRecordHandler keyCountingRecordHandler, ApplicationProperties properties) { return new EndlessConsumer<>( executor, - repository, properties.getClientId(), properties.getTopic(), - Clock.systemDefaultZone(), - properties.getCommitInterval(), kafkaConsumer, - handler); - wordcountRebalanceListener, - wordcountRecordHandler); ++ keyCountingRebalanceListener, ++ keyCountingRecordHandler); } @Bean diff --cc src/main/java/de/juplo/kafka/DriverController.java index ed38080,5d6c1a8..f6ff47f --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@@ -2,11 -2,8 +2,7 @@@ package de.juplo.kafka import lombok.RequiredArgsConstructor; import org.springframework.http.HttpStatus; - import org.springframework.web.bind.annotation.ExceptionHandler; - import org.springframework.web.bind.annotation.GetMapping; - import org.springframework.web.bind.annotation.PostMapping; - import org.springframework.web.bind.annotation.ResponseStatus; - import org.springframework.web.bind.annotation.RestController; -import org.springframework.http.ResponseEntity; + import org.springframework.web.bind.annotation.*; import java.util.Map; import java.util.concurrent.ExecutionException; @@@ -17,6 -14,7 +13,7 @@@ public class DriverController { private final EndlessConsumer consumer; - private final WordcountRecordHandler wordcount; ++ private final KeyCountingRecordHandler keyCountingRecordHandler; @PostMapping("start") @@@ -33,9 -31,22 +30,9 @@@ @GetMapping("seen") - public Map>> seen() + public Map> seen() { - return consumer.getSeen(); - return wordcount.getSeen(); - } - - @GetMapping("seen/{user}") - public ResponseEntity> seen(@PathVariable String user) - { - for (Map> users : wordcount.getSeen().values()) - { - Map words = users.get(user); - if (words != null) - return ResponseEntity.ok(words); - } - - return ResponseEntity.notFound().build(); ++ return keyCountingRecordHandler.getSeen(); } diff --cc src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java index 0000000,0000000..4a2c036 new file mode 100644 --- /dev/null +++ b/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java @@@ -1,0 -1,0 +1,83 @@@ ++package de.juplo.kafka; ++ ++import lombok.RequiredArgsConstructor; ++import lombok.extern.slf4j.Slf4j; ++import org.apache.kafka.clients.consumer.Consumer; ++import org.apache.kafka.common.TopicPartition; ++ ++import java.time.Clock; ++import java.time.Duration; ++import java.time.Instant; ++import java.util.Collection; ++import java.util.Map; ++ ++ ++@RequiredArgsConstructor ++@Slf4j ++public class KeyCountingRebalanceListener implements PollIntervalAwareConsumerRebalanceListener ++{ ++ private final KeyCountingRecordHandler handler; ++ private final PartitionStatisticsRepository repository; ++ private final String id; ++ private final String topic; ++ private final Clock clock; ++ private final Duration commitInterval; ++ private final Consumer consumer; ++ ++ private Instant lastCommit = Instant.EPOCH; ++ ++ @Override ++ public void onPartitionsAssigned(Collection partitions) ++ { ++ partitions.forEach(tp -> ++ { ++ Integer partition = tp.partition(); ++ Long offset = consumer.position(tp); ++ log.info("{} - adding partition: {}, offset={}", id, partition, offset); ++ StatisticsDocument document = ++ repository ++ .findById(Integer.toString(partition)) ++ .orElse(new StatisticsDocument(partition)); ++ if (document.offset >= 0) ++ { ++ // Only seek, if a stored offset was found ++ // Otherwise: Use initial offset, generated by Kafka ++ consumer.seek(tp, document.offset); ++ } ++ handler.addPartition(partition, document.statistics); ++ }); ++ } ++ ++ @Override ++ public void onPartitionsRevoked(Collection partitions) ++ { ++ partitions.forEach(tp -> ++ { ++ Integer partition = tp.partition(); ++ Long newOffset = consumer.position(tp); ++ log.info( ++ "{} - removing partition: {}, offset of next message {})", ++ id, ++ partition, ++ newOffset); ++ Map removed = handler.removePartition(partition); ++ repository.save(new StatisticsDocument(partition, removed, consumer.position(tp))); ++ }); ++ } ++ ++ ++ @Override ++ public void beforeNextPoll() ++ { ++ if (lastCommit.plus(commitInterval).isBefore(clock.instant())) ++ { ++ log.debug("Storing data and offsets, last commit: {}", lastCommit); ++ handler.getSeen().forEach((partiton, statistics) -> repository.save( ++ new StatisticsDocument( ++ partiton, ++ statistics, ++ consumer.position(new TopicPartition(topic, partiton))))); ++ lastCommit = clock.instant(); ++ } ++ } ++} diff --cc src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java index 0000000,0000000..099dcf7 new file mode 100644 --- /dev/null +++ b/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java @@@ -1,0 -1,0 +1,46 @@@ ++package de.juplo.kafka; ++ ++import lombok.extern.slf4j.Slf4j; ++import org.apache.kafka.clients.consumer.ConsumerRecord; ++ ++import java.util.HashMap; ++import java.util.Map; ++ ++ ++@Slf4j ++public class KeyCountingRecordHandler implements RecordHandler ++{ ++ private final Map> seen = new HashMap<>(); ++ ++ ++ @Override ++ public void accept(ConsumerRecord record) ++ { ++ Integer partition = record.partition(); ++ String key = record.key() == null ? "NULL" : record.key().toString(); ++ Map byKey = seen.get(partition); ++ ++ if (!byKey.containsKey(key)) ++ byKey.put(key, 0l); ++ ++ long seenByKey = byKey.get(key); ++ seenByKey++; ++ byKey.put(key, seenByKey); ++ } ++ ++ public void addPartition(Integer partition, Map statistics) ++ { ++ seen.put(partition, statistics); ++ } ++ ++ public Map removePartition(Integer partition) ++ { ++ return seen.remove(partition); ++ } ++ ++ ++ public Map> getSeen() ++ { ++ return seen; ++ } ++} diff --cc src/test/java/de/juplo/kafka/ApplicationTests.java index 431431b,f4c2104..a632a89 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@@ -21,7 -20,7 +21,6 @@@ import org.springframework.kafka.test.c import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; --import java.time.Clock; import java.time.Duration; import java.util.*; import java.util.concurrent.ExecutionException; @@@ -72,12 -70,15 +70,15 @@@ class ApplicationTest ExecutorService executor; @Autowired PartitionStatisticsRepository repository; + @Autowired - WordcountRebalanceListener wordcountRebalanceListener; ++ KeyCountingRebalanceListener keyCountingRebalanceListener; + @Autowired - WordcountRecordHandler wordcountRecordHandler; ++ KeyCountingRecordHandler keyCountingRecordHandler; - Consumer> testHandler; - EndlessConsumer endlessConsumer; + EndlessConsumer endlessConsumer; Map oldOffsets; Map newOffsets; - Set> receivedRecords; + Set> receivedRecords; /** Tests methods */ @@@ -185,7 -150,7 +186,7 @@@ Optional offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset); consumer.accept(tp, offset.orElse(0l)); }); -- } ++ } List partitions() { @@@ -254,25 -217,25 +253,25 @@@ newOffsets.put(tp, offset - 1); }); - Consumer> captureOffsetAndExecuteTestHandler = - record -> - { - newOffsets.put( - new TopicPartition(record.topic(), record.partition()), - record.offset()); - receivedRecords.add(record); - testHandler.accept(record); - TestRecordHandler captureOffsetAndExecuteTestHandler = - new TestRecordHandler(wordcountRecordHandler) { ++ TestRecordHandler captureOffsetAndExecuteTestHandler = ++ new TestRecordHandler(keyCountingRecordHandler) { + @Override - public void onNewRecord(ConsumerRecord record) ++ public void onNewRecord(ConsumerRecord record) + { + newOffsets.put( + new TopicPartition(record.topic(), record.partition()), + record.offset()); + receivedRecords.add(record); + } }; endlessConsumer = new EndlessConsumer<>( executor, - repository, properties.getClientId(), properties.getTopic(), - Clock.systemDefaultZone(), - properties.getCommitInterval(), kafkaConsumer, - wordcountRebalanceListener, ++ keyCountingRebalanceListener, captureOffsetAndExecuteTestHandler); endlessConsumer.start();