From: Kai Moritz Date: Sun, 14 Aug 2022 17:04:47 +0000 (+0200) Subject: Benennung vereinheitlicht und projektunabhängig gemacht X-Git-Tag: sumup-adder---lvm-2-tage~19 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=a2e8fc924e5b472d6b90c42d311514f91ea452f1;p=demos%2Fkafka%2Ftraining Benennung vereinheitlicht und projektunabhängig gemacht --- diff --git a/src/main/java/de/juplo/kafka/AdderRebalanceListener.java b/src/main/java/de/juplo/kafka/AdderRebalanceListener.java deleted file mode 100644 index ef595ba..0000000 --- a/src/main/java/de/juplo/kafka/AdderRebalanceListener.java +++ /dev/null @@ -1,106 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.common.TopicPartition; - -import java.time.Clock; -import java.time.Duration; -import java.time.Instant; -import java.util.Collection; - - -@RequiredArgsConstructor -@Slf4j -public class AdderRebalanceListener implements PollIntervalAwareConsumerRebalanceListener -{ - private final AdderRecordHandler handler; - private final PartitionStatisticsRepository repository; - private final String id; - private final String topic; - private final Clock clock; - private final Duration commitInterval; - private final Consumer consumer; - - private Instant lastCommit = Instant.EPOCH; - private boolean commitsEnabled = true; - - @Override - public void onPartitionsAssigned(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - StateDocument document = - repository - .findById(Integer.toString(partition)) - .orElse(new StateDocument(partition)); - log.info("{} - adding partition: {}, offset={}", id, partition, document.offset); - if (document.offset >= 0) - { - // Only seek, if a stored offset was found - // Otherwise: Use initial offset, generated by Kafka - consumer.seek(tp, document.offset); - } - handler.addPartition(partition, document.state); - }); - } - - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - Long offset = consumer.position(tp); - log.info( - "{} - removing partition: {}, offset of next message {})", - id, - partition, - offset); - if (commitsEnabled) - { - repository.save(new StateDocument(partition, handler.removePartition(partition), offset)); - } - else - { - log.info("Offset commits are disabled! Last commit: {}", lastCommit); - } - }); - } - - - @Override - public void beforeNextPoll() - { - if (!commitsEnabled) - { - log.info("Offset commits are disabled! Last commit: {}", lastCommit); - return; - } - - if (lastCommit.plus(commitInterval).isBefore(clock.instant())) - { - log.debug("Storing data and offsets, last commit: {}", lastCommit); - handler.getState().forEach((partiton, sumBusinessLogic) -> repository.save( - new StateDocument( - partiton, - sumBusinessLogic.getState(), - consumer.position(new TopicPartition(topic, partiton))))); - lastCommit = clock.instant(); - } - } - - @Override - public void enableCommits() - { - commitsEnabled = true; - } - - @Override - public void disableCommits() - { - commitsEnabled = false; - } -} diff --git a/src/main/java/de/juplo/kafka/AdderRecordHandler.java b/src/main/java/de/juplo/kafka/AdderRecordHandler.java deleted file mode 100644 index ecd47bc..0000000 --- a/src/main/java/de/juplo/kafka/AdderRecordHandler.java +++ /dev/null @@ -1,54 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; - -import java.util.HashMap; -import java.util.Map; - - -@Slf4j -public class AdderRecordHandler implements RecordHandler -{ - private final Map state = new HashMap<>(); - - - @Override - public void accept(ConsumerRecord record) - { - Integer partition = record.partition(); - String user = record.key(); - String message = record.value(); - switch (message) - { - case "START": - state.get(partition).startSum(user); - break; - - case "END": - Long result = state.get(partition).endSum(user); - log.info("New result for {}: {}", user, result); - break; - - default: - state.get(partition).addToSum(user, Integer.parseInt(message)); - break; - } - } - - protected void addPartition(Integer partition, Map state) - { - this.state.put(partition, new AdderBusinessLogic(state)); - } - - protected Map removePartition(Integer partition) - { - return this.state.remove(partition).getState(); - } - - - public Map getState() - { - return state; - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 973e973..9f54083 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -18,21 +18,21 @@ import java.util.concurrent.Executors; public class ApplicationConfiguration { @Bean - public AdderRecordHandler sumRecordHandler() + public ApplicationRecordHandler recordHandler() { - return new AdderRecordHandler(); + return new ApplicationRecordHandler(); } @Bean - public AdderRebalanceListener sumRebalanceListener( - AdderRecordHandler adderRecordHandler, - PartitionStatisticsRepository repository, + public ApplicationRebalanceListener rebalanceListener( + ApplicationRecordHandler recordHandler, + StateRepository stateRepository, Consumer consumer, ApplicationProperties properties) { - return new AdderRebalanceListener( - adderRecordHandler, - repository, + return new ApplicationRebalanceListener( + recordHandler, + stateRepository, properties.getClientId(), properties.getTopic(), Clock.systemDefaultZone(), @@ -44,8 +44,8 @@ public class ApplicationConfiguration public EndlessConsumer endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, - AdderRebalanceListener adderRebalanceListener, - AdderRecordHandler adderRecordHandler, + ApplicationRebalanceListener rebalanceListener, + ApplicationRecordHandler recordHandler, ApplicationProperties properties) { return @@ -54,8 +54,8 @@ public class ApplicationConfiguration properties.getClientId(), properties.getTopic(), kafkaConsumer, - adderRebalanceListener, - adderRecordHandler); + rebalanceListener, + recordHandler); } @Bean diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java new file mode 100644 index 0000000..542af2d --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -0,0 +1,108 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.TopicPartition; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.Map; + + +@RequiredArgsConstructor +@Slf4j +public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRebalanceListener +{ + private final ApplicationRecordHandler recordHandler; + private final StateRepository stateRepository; + private final String id; + private final String topic; + private final Clock clock; + private final Duration commitInterval; + private final Consumer consumer; + + private Instant lastCommit = Instant.EPOCH; + private boolean commitsEnabled = true; + + @Override + public void onPartitionsAssigned(Collection partitions) + { + partitions.forEach(tp -> + { + Integer partition = tp.partition(); + StateDocument document = + stateRepository + .findById(Integer.toString(partition)) + .orElse(new StateDocument(partition)); + log.info("{} - adding partition: {}, offset={}", id, partition, document.offset); + if (document.offset >= 0) + { + // Only seek, if a stored offset was found + // Otherwise: Use initial offset, generated by Kafka + consumer.seek(tp, document.offset); + } + recordHandler.addPartition(partition, document.state); + }); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions.forEach(tp -> + { + Integer partition = tp.partition(); + Long offset = consumer.position(tp); + log.info( + "{} - removing partition: {}, offset of next message {})", + id, + partition, + offset); + if (commitsEnabled) + { + Map removed = recordHandler.removePartition(partition); + stateRepository.save(new StateDocument(partition, removed, offset)); + } + else + { + log.info("Offset commits are disabled! Last commit: {}", lastCommit); + } + }); + } + + + @Override + public void beforeNextPoll() + { + if (!commitsEnabled) + { + log.info("Offset commits are disabled! Last commit: {}", lastCommit); + return; + } + + if (lastCommit.plus(commitInterval).isBefore(clock.instant())) + { + log.debug("Storing data and offsets, last commit: {}", lastCommit); + recordHandler.getState().forEach((partiton, adder) -> stateRepository.save( + new StateDocument( + partiton, + adder.getState(), + consumer.position(new TopicPartition(topic, partiton))))); + lastCommit = clock.instant(); + } + } + + @Override + public void enableCommits() + { + commitsEnabled = true; + } + + @Override + public void disableCommits() + { + commitsEnabled = false; + } +} diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java new file mode 100644 index 0000000..d0d385c --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -0,0 +1,54 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.util.HashMap; +import java.util.Map; + + +@Slf4j +public class ApplicationRecordHandler implements RecordHandler +{ + private final Map state = new HashMap<>(); + + + @Override + public void accept(ConsumerRecord record) + { + Integer partition = record.partition(); + String user = record.key(); + String message = record.value(); + switch (message) + { + case "START": + state.get(partition).startSum(user); + break; + + case "END": + Long result = state.get(partition).endSum(user); + log.info("New result for {}: {}", user, result); + break; + + default: + state.get(partition).addToSum(user, Integer.parseInt(message)); + break; + } + } + + protected void addPartition(Integer partition, Map state) + { + this.state.put(partition, new AdderBusinessLogic(state)); + } + + protected Map removePartition(Integer partition) + { + return this.state.remove(partition).getState(); + } + + + public Map getState() + { + return state; + } +} diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index 0870f19..d389271 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -16,7 +16,7 @@ import java.util.stream.Collectors; public class DriverController { private final EndlessConsumer consumer; - private final AdderRecordHandler adderRecordHandler; + private final ApplicationRecordHandler recordHandler; @PostMapping("start") @@ -36,7 +36,7 @@ public class DriverController public Map> state() { return - adderRecordHandler + recordHandler .getState() .entrySet() .stream() @@ -48,9 +48,9 @@ public class DriverController @GetMapping("state/{user}") public ResponseEntity seen(@PathVariable String user) { - for (AdderBusinessLogic adderBusinessLogic : adderRecordHandler.getState().values()) + for (AdderBusinessLogic adder : recordHandler.getState().values()) { - Optional sum = adderBusinessLogic.getSum(user); + Optional sum = adder.getSum(user); if (sum.isPresent()) return ResponseEntity.ok(sum.get()); } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index cfba6df..58374f4 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -25,8 +25,8 @@ public class EndlessConsumer implements Runnable private final String id; private final String topic; private final Consumer consumer; - private final PollIntervalAwareConsumerRebalanceListener pollIntervalAwareRebalanceListener; - private final RecordHandler handler; + private final PollIntervalAwareConsumerRebalanceListener rebalanceListener; + private final RecordHandler recordHandler; private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); @@ -42,8 +42,8 @@ public class EndlessConsumer implements Runnable try { log.info("{} - Subscribing to topic {}", id, topic); - pollIntervalAwareRebalanceListener.enableCommits(); - consumer.subscribe(Arrays.asList(topic), pollIntervalAwareRebalanceListener); + rebalanceListener.enableCommits(); + consumer.subscribe(Arrays.asList(topic), rebalanceListener); while (true) { @@ -64,12 +64,12 @@ public class EndlessConsumer implements Runnable record.value() ); - handler.accept(record); + recordHandler.accept(record); consumed++; } - pollIntervalAwareRebalanceListener.beforeNextPoll(); + rebalanceListener.beforeNextPoll(); } } catch(WakeupException e) @@ -93,7 +93,7 @@ public class EndlessConsumer implements Runnable catch(Exception e) { log.error("{} - Unexpected error: {}, disabling commits", id, e.toString(), e); - pollIntervalAwareRebalanceListener.disableCommits(); + rebalanceListener.disableCommits(); shutdown(e); } finally diff --git a/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java b/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java deleted file mode 100644 index 9e26410..0000000 --- a/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka; - -import org.springframework.data.mongodb.repository.MongoRepository; - -import java.util.Optional; - - -public interface PartitionStatisticsRepository extends MongoRepository -{ - public Optional findById(String partition); -} diff --git a/src/main/java/de/juplo/kafka/StateDocument.java b/src/main/java/de/juplo/kafka/StateDocument.java index 2583c8e..0540e3f 100644 --- a/src/main/java/de/juplo/kafka/StateDocument.java +++ b/src/main/java/de/juplo/kafka/StateDocument.java @@ -5,11 +5,10 @@ import org.springframework.data.annotation.Id; import org.springframework.data.mongodb.core.mapping.Document; import java.util.HashMap; -import java.util.List; import java.util.Map; -@Document(collection = "statistics") +@Document(collection = "state") @ToString public class StateDocument { diff --git a/src/main/java/de/juplo/kafka/StateRepository.java b/src/main/java/de/juplo/kafka/StateRepository.java new file mode 100644 index 0000000..3129535 --- /dev/null +++ b/src/main/java/de/juplo/kafka/StateRepository.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +import org.springframework.data.mongodb.repository.MongoRepository; + +import java.util.Optional; + + +public interface StateRepository extends MongoRepository +{ + public Optional findById(String partition); +} diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 711a44a..9a6f812 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -60,7 +60,7 @@ abstract class GenericApplicationTests @Autowired ExecutorService executor; @Autowired - PartitionStatisticsRepository partitionStatisticsRepository; + StateRepository stateRepository; @Autowired PollIntervalAwareConsumerRebalanceListener rebalanceListener; @Autowired @@ -233,11 +233,11 @@ abstract class GenericApplicationTests log.info("New position for {}: {}", tp, offset); Integer partition = tp.partition(); StateDocument document = - partitionStatisticsRepository + stateRepository .findById(partition.toString()) .orElse(new StateDocument(partition)); document.offset = offset; - partitionStatisticsRepository.save(document); + stateRepository.save(document); }); offsetConsumer.unsubscribe(); } @@ -247,7 +247,7 @@ abstract class GenericApplicationTests partitions().forEach(tp -> { String partition = Integer.toString(tp.partition()); - Optional offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset); + Optional offset = stateRepository.findById(partition).map(document -> document.offset); consumer.accept(tp, offset.orElse(0l)); }); }