From: Kai Moritz Date: Sun, 14 Aug 2022 17:11:25 +0000 (+0200) Subject: Benennung vereinheitlicht und projektunabhängig gemacht X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=4abc82d5d4cc80feabee91a10749b40f9bcfd879;hp=-c;p=demos%2Fkafka%2Ftraining Benennung vereinheitlicht und projektunabhängig gemacht --- 4abc82d5d4cc80feabee91a10749b40f9bcfd879 diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 3925fcb..a9d9b15 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -19,21 +19,21 @@ import java.util.concurrent.Executors; public class ApplicationConfiguration { @Bean - public KeyCountingRecordHandler messageCountingRecordHandler() + public ApplicationRecordHandler recordHandler() { - return new KeyCountingRecordHandler(); + return new ApplicationRecordHandler(); } @Bean - public KeyCountingRebalanceListener wordcountRebalanceListener( - KeyCountingRecordHandler keyCountingRecordHandler, - PartitionStatisticsRepository repository, + public ApplicationRebalanceListener rebalanceListener( + ApplicationRecordHandler recordHandler, + StateRepository stateRepository, Consumer consumer, ApplicationProperties properties) { - return new KeyCountingRebalanceListener( - keyCountingRecordHandler, - repository, + return new ApplicationRebalanceListener( + recordHandler, + stateRepository, properties.getClientId(), properties.getTopic(), Clock.systemDefaultZone(), @@ -45,8 +45,8 @@ public class ApplicationConfiguration public EndlessConsumer endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, - KeyCountingRebalanceListener keyCountingRebalanceListener, - KeyCountingRecordHandler keyCountingRecordHandler, + ApplicationRebalanceListener rebalanceListener, + ApplicationRecordHandler recordHandler, ApplicationProperties properties) { return @@ -55,8 +55,8 @@ public class ApplicationConfiguration properties.getClientId(), properties.getTopic(), kafkaConsumer, - keyCountingRebalanceListener, - keyCountingRecordHandler); + rebalanceListener, + recordHandler); } @Bean diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java new file mode 100644 index 0000000..2fccb4f --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -0,0 +1,83 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.TopicPartition; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.Map; + + +@RequiredArgsConstructor +@Slf4j +public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRebalanceListener +{ + private final ApplicationRecordHandler recordHandler; + private final StateRepository stateRepository; + private final String id; + private final String topic; + private final Clock clock; + private final Duration commitInterval; + private final Consumer consumer; + + private Instant lastCommit = Instant.EPOCH; + + @Override + public void onPartitionsAssigned(Collection partitions) + { + partitions.forEach(tp -> + { + Integer partition = tp.partition(); + Long offset = consumer.position(tp); + log.info("{} - adding partition: {}, offset={}", id, partition, offset); + StateDocument document = + stateRepository + .findById(Integer.toString(partition)) + .orElse(new StateDocument(partition)); + if (document.offset >= 0) + { + // Only seek, if a stored offset was found + // Otherwise: Use initial offset, generated by Kafka + consumer.seek(tp, document.offset); + } + recordHandler.addPartition(partition, document.state); + }); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions.forEach(tp -> + { + Integer partition = tp.partition(); + Long newOffset = consumer.position(tp); + log.info( + "{} - removing partition: {}, offset of next message {})", + id, + partition, + newOffset); + Map removed = recordHandler.removePartition(partition); + stateRepository.save(new StateDocument(partition, removed, consumer.position(tp))); + }); + } + + + @Override + public void beforeNextPoll() + { + if (lastCommit.plus(commitInterval).isBefore(clock.instant())) + { + log.debug("Storing data and offsets, last commit: {}", lastCommit); + recordHandler.getState().forEach((partiton, state) -> stateRepository.save( + new StateDocument( + partiton, + state, + consumer.position(new TopicPartition(topic, partiton))))); + lastCommit = clock.instant(); + } + } +} diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java new file mode 100644 index 0000000..c2c2657 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -0,0 +1,46 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.util.HashMap; +import java.util.Map; + + +@Slf4j +public class ApplicationRecordHandler implements RecordHandler +{ + private final Map> state = new HashMap<>(); + + + @Override + public void accept(ConsumerRecord record) + { + Integer partition = record.partition(); + String key = record.key() == null ? "NULL" : record.key().toString(); + Map byKey = state.get(partition); + + if (!byKey.containsKey(key)) + byKey.put(key, 0l); + + long seenByKey = byKey.get(key); + seenByKey++; + byKey.put(key, seenByKey); + } + + protected void addPartition(Integer partition, Map state) + { + this.state.put(partition, state); + } + + protected Map removePartition(Integer partition) + { + return this.state.remove(partition); + } + + + public Map> getState() + { + return state; + } +} diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index f6ff47f..09fb762 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -13,7 +13,7 @@ import java.util.concurrent.ExecutionException; public class DriverController { private final EndlessConsumer consumer; - private final KeyCountingRecordHandler keyCountingRecordHandler; + private final ApplicationRecordHandler recordHandler; @PostMapping("start") @@ -29,10 +29,10 @@ public class DriverController } - @GetMapping("seen") - public Map> seen() + @GetMapping("state") + public Map> state() { - return keyCountingRecordHandler.getSeen(); + return recordHandler.getState(); } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 58557f2..17778be 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -25,8 +25,8 @@ public class EndlessConsumer implements Runnable private final String id; private final String topic; private final Consumer consumer; - private final PollIntervalAwareConsumerRebalanceListener pollIntervalAwareRebalanceListener; - private final RecordHandler handler; + private final PollIntervalAwareConsumerRebalanceListener rebalanceListener; + private final RecordHandler recordHandler; private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); @@ -42,7 +42,7 @@ public class EndlessConsumer implements Runnable try { log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic), pollIntervalAwareRebalanceListener); + consumer.subscribe(Arrays.asList(topic), rebalanceListener); while (true) { @@ -63,12 +63,12 @@ public class EndlessConsumer implements Runnable record.value() ); - handler.accept(record); + recordHandler.accept(record); consumed++; } - pollIntervalAwareRebalanceListener.beforeNextPoll(); + rebalanceListener.beforeNextPoll(); } } catch(WakeupException e) diff --git a/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java b/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java deleted file mode 100644 index 4a2c036..0000000 --- a/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java +++ /dev/null @@ -1,83 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.common.TopicPartition; - -import java.time.Clock; -import java.time.Duration; -import java.time.Instant; -import java.util.Collection; -import java.util.Map; - - -@RequiredArgsConstructor -@Slf4j -public class KeyCountingRebalanceListener implements PollIntervalAwareConsumerRebalanceListener -{ - private final KeyCountingRecordHandler handler; - private final PartitionStatisticsRepository repository; - private final String id; - private final String topic; - private final Clock clock; - private final Duration commitInterval; - private final Consumer consumer; - - private Instant lastCommit = Instant.EPOCH; - - @Override - public void onPartitionsAssigned(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - Long offset = consumer.position(tp); - log.info("{} - adding partition: {}, offset={}", id, partition, offset); - StatisticsDocument document = - repository - .findById(Integer.toString(partition)) - .orElse(new StatisticsDocument(partition)); - if (document.offset >= 0) - { - // Only seek, if a stored offset was found - // Otherwise: Use initial offset, generated by Kafka - consumer.seek(tp, document.offset); - } - handler.addPartition(partition, document.statistics); - }); - } - - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - Long newOffset = consumer.position(tp); - log.info( - "{} - removing partition: {}, offset of next message {})", - id, - partition, - newOffset); - Map removed = handler.removePartition(partition); - repository.save(new StatisticsDocument(partition, removed, consumer.position(tp))); - }); - } - - - @Override - public void beforeNextPoll() - { - if (lastCommit.plus(commitInterval).isBefore(clock.instant())) - { - log.debug("Storing data and offsets, last commit: {}", lastCommit); - handler.getSeen().forEach((partiton, statistics) -> repository.save( - new StatisticsDocument( - partiton, - statistics, - consumer.position(new TopicPartition(topic, partiton))))); - lastCommit = clock.instant(); - } - } -} diff --git a/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java b/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java deleted file mode 100644 index 099dcf7..0000000 --- a/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java +++ /dev/null @@ -1,46 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; - -import java.util.HashMap; -import java.util.Map; - - -@Slf4j -public class KeyCountingRecordHandler implements RecordHandler -{ - private final Map> seen = new HashMap<>(); - - - @Override - public void accept(ConsumerRecord record) - { - Integer partition = record.partition(); - String key = record.key() == null ? "NULL" : record.key().toString(); - Map byKey = seen.get(partition); - - if (!byKey.containsKey(key)) - byKey.put(key, 0l); - - long seenByKey = byKey.get(key); - seenByKey++; - byKey.put(key, seenByKey); - } - - public void addPartition(Integer partition, Map statistics) - { - seen.put(partition, statistics); - } - - public Map removePartition(Integer partition) - { - return seen.remove(partition); - } - - - public Map> getSeen() - { - return seen; - } -} diff --git a/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java b/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java deleted file mode 100644 index 0ccf3cd..0000000 --- a/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka; - -import org.springframework.data.mongodb.repository.MongoRepository; - -import java.util.Optional; - - -public interface PartitionStatisticsRepository extends MongoRepository -{ - public Optional findById(String partition); -} diff --git a/src/main/java/de/juplo/kafka/StateDocument.java b/src/main/java/de/juplo/kafka/StateDocument.java new file mode 100644 index 0000000..bb1c701 --- /dev/null +++ b/src/main/java/de/juplo/kafka/StateDocument.java @@ -0,0 +1,36 @@ +package de.juplo.kafka; + +import lombok.ToString; +import org.springframework.data.annotation.Id; +import org.springframework.data.mongodb.core.mapping.Document; + +import java.util.HashMap; +import java.util.Map; + + +@Document(collection = "state") +@ToString +public class StateDocument +{ + @Id + public String id; + public long offset = -1l; + public Map state; + + public StateDocument() + { + } + + public StateDocument(Integer partition) + { + this.id = Integer.toString(partition); + this.state = new HashMap<>(); + } + + public StateDocument(Integer partition, Map state, long offset) + { + this.id = Integer.toString(partition); + this.state = state; + this.offset = offset; + } +} diff --git a/src/main/java/de/juplo/kafka/StateRepository.java b/src/main/java/de/juplo/kafka/StateRepository.java new file mode 100644 index 0000000..3129535 --- /dev/null +++ b/src/main/java/de/juplo/kafka/StateRepository.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +import org.springframework.data.mongodb.repository.MongoRepository; + +import java.util.Optional; + + +public interface StateRepository extends MongoRepository +{ + public Optional findById(String partition); +} diff --git a/src/main/java/de/juplo/kafka/StatisticsDocument.java b/src/main/java/de/juplo/kafka/StatisticsDocument.java deleted file mode 100644 index 1244f45..0000000 --- a/src/main/java/de/juplo/kafka/StatisticsDocument.java +++ /dev/null @@ -1,36 +0,0 @@ -package de.juplo.kafka; - -import lombok.ToString; -import org.springframework.data.annotation.Id; -import org.springframework.data.mongodb.core.mapping.Document; - -import java.util.HashMap; -import java.util.Map; - - -@Document(collection = "statistics") -@ToString -public class StatisticsDocument -{ - @Id - public String id; - public long offset = -1l; - public Map statistics; - - public StatisticsDocument() - { - } - - public StatisticsDocument(Integer partition) - { - this.id = Integer.toString(partition); - this.statistics = new HashMap<>(); - } - - public StatisticsDocument(Integer partition, Map statistics, long offset) - { - this.id = Integer.toString(partition); - this.statistics = statistics; - this.offset = offset; - } -} diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index fc5d4c9..1f18e59 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -64,17 +64,15 @@ class ApplicationTests @Autowired KafkaConsumer offsetConsumer; @Autowired - PartitionStatisticsRepository partitionStatisticsRepository; - @Autowired ApplicationProperties properties; @Autowired ExecutorService executor; @Autowired - PartitionStatisticsRepository repository; + StateRepository stateRepository; @Autowired - KeyCountingRebalanceListener keyCountingRebalanceListener; + ApplicationRebalanceListener rebalanceListener; @Autowired - KeyCountingRecordHandler keyCountingRecordHandler; + ApplicationRecordHandler recordHandler; EndlessConsumer endlessConsumer; Map oldOffsets; @@ -196,12 +194,12 @@ class ApplicationTests Long offset = offsetConsumer.position(tp); log.info("New position for {}: {}", tp, offset); Integer partition = tp.partition(); - StatisticsDocument document = - partitionStatisticsRepository + StateDocument document = + stateRepository .findById(partition.toString()) - .orElse(new StatisticsDocument(partition)); + .orElse(new StateDocument(partition)); document.offset = offset; - partitionStatisticsRepository.save(document); + stateRepository.save(document); }); offsetConsumer.unsubscribe(); } @@ -211,7 +209,7 @@ class ApplicationTests partitions().forEach(tp -> { String partition = Integer.toString(tp.partition()); - Optional offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset); + Optional offset = stateRepository.findById(partition).map(document -> document.offset); consumer.accept(tp, offset.orElse(0l)); }); } @@ -283,7 +281,7 @@ class ApplicationTests }); TestRecordHandler captureOffsetAndExecuteTestHandler = - new TestRecordHandler(keyCountingRecordHandler) { + new TestRecordHandler(recordHandler) { @Override public void onNewRecord(ConsumerRecord record) { @@ -300,7 +298,7 @@ class ApplicationTests properties.getClientId(), properties.getTopic(), kafkaConsumer, - keyCountingRebalanceListener, + rebalanceListener, captureOffsetAndExecuteTestHandler); endlessConsumer.start();