From 4abc82d5d4cc80feabee91a10749b40f9bcfd879 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 14 Aug 2022 19:11:25 +0200 Subject: [PATCH] =?utf8?q?Benennung=20vereinheitlicht=20und=20projektunabh?= =?utf8?q?=C3=A4ngig=20gemacht?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../juplo/kafka/ApplicationConfiguration.java | 24 +++++++++---------- ...java => ApplicationRebalanceListener.java} | 24 +++++++++---------- ...ler.java => ApplicationRecordHandler.java} | 18 +++++++------- .../java/de/juplo/kafka/DriverController.java | 8 +++---- .../java/de/juplo/kafka/EndlessConsumer.java | 10 ++++---- .../kafka/PartitionStatisticsRepository.java | 11 --------- ...isticsDocument.java => StateDocument.java} | 16 ++++++------- .../java/de/juplo/kafka/StateRepository.java | 11 +++++++++ .../java/de/juplo/kafka/ApplicationTests.java | 22 ++++++++--------- 9 files changed, 71 insertions(+), 73 deletions(-) rename src/main/java/de/juplo/kafka/{KeyCountingRebalanceListener.java => ApplicationRebalanceListener.java} (75%) rename src/main/java/de/juplo/kafka/{KeyCountingRecordHandler.java => ApplicationRecordHandler.java} (54%) delete mode 100644 src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java rename src/main/java/de/juplo/kafka/{StatisticsDocument.java => StateDocument.java} (54%) create mode 100644 src/main/java/de/juplo/kafka/StateRepository.java diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 3925fcb..a9d9b15 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -19,21 +19,21 @@ import java.util.concurrent.Executors; public class ApplicationConfiguration { @Bean - public KeyCountingRecordHandler messageCountingRecordHandler() + public ApplicationRecordHandler recordHandler() { - return new KeyCountingRecordHandler(); + return new ApplicationRecordHandler(); } @Bean - public KeyCountingRebalanceListener wordcountRebalanceListener( - KeyCountingRecordHandler keyCountingRecordHandler, - PartitionStatisticsRepository repository, + public ApplicationRebalanceListener rebalanceListener( + ApplicationRecordHandler recordHandler, + StateRepository stateRepository, Consumer consumer, ApplicationProperties properties) { - return new KeyCountingRebalanceListener( - keyCountingRecordHandler, - repository, + return new ApplicationRebalanceListener( + recordHandler, + stateRepository, properties.getClientId(), properties.getTopic(), Clock.systemDefaultZone(), @@ -45,8 +45,8 @@ public class ApplicationConfiguration public EndlessConsumer endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, - KeyCountingRebalanceListener keyCountingRebalanceListener, - KeyCountingRecordHandler keyCountingRecordHandler, + ApplicationRebalanceListener rebalanceListener, + ApplicationRecordHandler recordHandler, ApplicationProperties properties) { return @@ -55,8 +55,8 @@ public class ApplicationConfiguration properties.getClientId(), properties.getTopic(), kafkaConsumer, - keyCountingRebalanceListener, - keyCountingRecordHandler); + rebalanceListener, + recordHandler); } @Bean diff --git a/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java similarity index 75% rename from src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java rename to src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index 4a2c036..2fccb4f 100644 --- a/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -14,10 +14,10 @@ import java.util.Map; @RequiredArgsConstructor @Slf4j -public class KeyCountingRebalanceListener implements PollIntervalAwareConsumerRebalanceListener +public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRebalanceListener { - private final KeyCountingRecordHandler handler; - private final PartitionStatisticsRepository repository; + private final ApplicationRecordHandler recordHandler; + private final StateRepository stateRepository; private final String id; private final String topic; private final Clock clock; @@ -34,17 +34,17 @@ public class KeyCountingRebalanceListener implements PollIntervalAwareConsumerRe Integer partition = tp.partition(); Long offset = consumer.position(tp); log.info("{} - adding partition: {}, offset={}", id, partition, offset); - StatisticsDocument document = - repository + StateDocument document = + stateRepository .findById(Integer.toString(partition)) - .orElse(new StatisticsDocument(partition)); + .orElse(new StateDocument(partition)); if (document.offset >= 0) { // Only seek, if a stored offset was found // Otherwise: Use initial offset, generated by Kafka consumer.seek(tp, document.offset); } - handler.addPartition(partition, document.statistics); + recordHandler.addPartition(partition, document.state); }); } @@ -60,8 +60,8 @@ public class KeyCountingRebalanceListener implements PollIntervalAwareConsumerRe id, partition, newOffset); - Map removed = handler.removePartition(partition); - repository.save(new StatisticsDocument(partition, removed, consumer.position(tp))); + Map removed = recordHandler.removePartition(partition); + stateRepository.save(new StateDocument(partition, removed, consumer.position(tp))); }); } @@ -72,10 +72,10 @@ public class KeyCountingRebalanceListener implements PollIntervalAwareConsumerRe if (lastCommit.plus(commitInterval).isBefore(clock.instant())) { log.debug("Storing data and offsets, last commit: {}", lastCommit); - handler.getSeen().forEach((partiton, statistics) -> repository.save( - new StatisticsDocument( + recordHandler.getState().forEach((partiton, state) -> stateRepository.save( + new StateDocument( partiton, - statistics, + state, consumer.position(new TopicPartition(topic, partiton))))); lastCommit = clock.instant(); } diff --git a/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java similarity index 54% rename from src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java rename to src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 099dcf7..c2c2657 100644 --- a/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -8,9 +8,9 @@ import java.util.Map; @Slf4j -public class KeyCountingRecordHandler implements RecordHandler +public class ApplicationRecordHandler implements RecordHandler { - private final Map> seen = new HashMap<>(); + private final Map> state = new HashMap<>(); @Override @@ -18,7 +18,7 @@ public class KeyCountingRecordHandler implements RecordHandler { Integer partition = record.partition(); String key = record.key() == null ? "NULL" : record.key().toString(); - Map byKey = seen.get(partition); + Map byKey = state.get(partition); if (!byKey.containsKey(key)) byKey.put(key, 0l); @@ -28,19 +28,19 @@ public class KeyCountingRecordHandler implements RecordHandler byKey.put(key, seenByKey); } - public void addPartition(Integer partition, Map statistics) + protected void addPartition(Integer partition, Map state) { - seen.put(partition, statistics); + this.state.put(partition, state); } - public Map removePartition(Integer partition) + protected Map removePartition(Integer partition) { - return seen.remove(partition); + return this.state.remove(partition); } - public Map> getSeen() + public Map> getState() { - return seen; + return state; } } diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index f6ff47f..09fb762 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -13,7 +13,7 @@ import java.util.concurrent.ExecutionException; public class DriverController { private final EndlessConsumer consumer; - private final KeyCountingRecordHandler keyCountingRecordHandler; + private final ApplicationRecordHandler recordHandler; @PostMapping("start") @@ -29,10 +29,10 @@ public class DriverController } - @GetMapping("seen") - public Map> seen() + @GetMapping("state") + public Map> state() { - return keyCountingRecordHandler.getSeen(); + return recordHandler.getState(); } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 58557f2..17778be 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -25,8 +25,8 @@ public class EndlessConsumer implements Runnable private final String id; private final String topic; private final Consumer consumer; - private final PollIntervalAwareConsumerRebalanceListener pollIntervalAwareRebalanceListener; - private final RecordHandler handler; + private final PollIntervalAwareConsumerRebalanceListener rebalanceListener; + private final RecordHandler recordHandler; private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); @@ -42,7 +42,7 @@ public class EndlessConsumer implements Runnable try { log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic), pollIntervalAwareRebalanceListener); + consumer.subscribe(Arrays.asList(topic), rebalanceListener); while (true) { @@ -63,12 +63,12 @@ public class EndlessConsumer implements Runnable record.value() ); - handler.accept(record); + recordHandler.accept(record); consumed++; } - pollIntervalAwareRebalanceListener.beforeNextPoll(); + rebalanceListener.beforeNextPoll(); } } catch(WakeupException e) diff --git a/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java b/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java deleted file mode 100644 index 0ccf3cd..0000000 --- a/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka; - -import org.springframework.data.mongodb.repository.MongoRepository; - -import java.util.Optional; - - -public interface PartitionStatisticsRepository extends MongoRepository -{ - public Optional findById(String partition); -} diff --git a/src/main/java/de/juplo/kafka/StatisticsDocument.java b/src/main/java/de/juplo/kafka/StateDocument.java similarity index 54% rename from src/main/java/de/juplo/kafka/StatisticsDocument.java rename to src/main/java/de/juplo/kafka/StateDocument.java index 1244f45..bb1c701 100644 --- a/src/main/java/de/juplo/kafka/StatisticsDocument.java +++ b/src/main/java/de/juplo/kafka/StateDocument.java @@ -8,29 +8,29 @@ import java.util.HashMap; import java.util.Map; -@Document(collection = "statistics") +@Document(collection = "state") @ToString -public class StatisticsDocument +public class StateDocument { @Id public String id; public long offset = -1l; - public Map statistics; + public Map state; - public StatisticsDocument() + public StateDocument() { } - public StatisticsDocument(Integer partition) + public StateDocument(Integer partition) { this.id = Integer.toString(partition); - this.statistics = new HashMap<>(); + this.state = new HashMap<>(); } - public StatisticsDocument(Integer partition, Map statistics, long offset) + public StateDocument(Integer partition, Map state, long offset) { this.id = Integer.toString(partition); - this.statistics = statistics; + this.state = state; this.offset = offset; } } diff --git a/src/main/java/de/juplo/kafka/StateRepository.java b/src/main/java/de/juplo/kafka/StateRepository.java new file mode 100644 index 0000000..3129535 --- /dev/null +++ b/src/main/java/de/juplo/kafka/StateRepository.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +import org.springframework.data.mongodb.repository.MongoRepository; + +import java.util.Optional; + + +public interface StateRepository extends MongoRepository +{ + public Optional findById(String partition); +} diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index fc5d4c9..1f18e59 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -64,17 +64,15 @@ class ApplicationTests @Autowired KafkaConsumer offsetConsumer; @Autowired - PartitionStatisticsRepository partitionStatisticsRepository; - @Autowired ApplicationProperties properties; @Autowired ExecutorService executor; @Autowired - PartitionStatisticsRepository repository; + StateRepository stateRepository; @Autowired - KeyCountingRebalanceListener keyCountingRebalanceListener; + ApplicationRebalanceListener rebalanceListener; @Autowired - KeyCountingRecordHandler keyCountingRecordHandler; + ApplicationRecordHandler recordHandler; EndlessConsumer endlessConsumer; Map oldOffsets; @@ -196,12 +194,12 @@ class ApplicationTests Long offset = offsetConsumer.position(tp); log.info("New position for {}: {}", tp, offset); Integer partition = tp.partition(); - StatisticsDocument document = - partitionStatisticsRepository + StateDocument document = + stateRepository .findById(partition.toString()) - .orElse(new StatisticsDocument(partition)); + .orElse(new StateDocument(partition)); document.offset = offset; - partitionStatisticsRepository.save(document); + stateRepository.save(document); }); offsetConsumer.unsubscribe(); } @@ -211,7 +209,7 @@ class ApplicationTests partitions().forEach(tp -> { String partition = Integer.toString(tp.partition()); - Optional offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset); + Optional offset = stateRepository.findById(partition).map(document -> document.offset); consumer.accept(tp, offset.orElse(0l)); }); } @@ -283,7 +281,7 @@ class ApplicationTests }); TestRecordHandler captureOffsetAndExecuteTestHandler = - new TestRecordHandler(keyCountingRecordHandler) { + new TestRecordHandler(recordHandler) { @Override public void onNewRecord(ConsumerRecord record) { @@ -300,7 +298,7 @@ class ApplicationTests properties.getClientId(), properties.getTopic(), kafkaConsumer, - keyCountingRebalanceListener, + rebalanceListener, captureOffsetAndExecuteTestHandler); endlessConsumer.start(); -- 2.20.1