From a2e8fc924e5b472d6b90c42d311514f91ea452f1 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 14 Aug 2022 19:04:47 +0200 Subject: [PATCH] =?utf8?q?Benennung=20vereinheitlicht=20und=20projektunabh?= =?utf8?q?=C3=A4ngig=20gemacht?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../juplo/kafka/ApplicationConfiguration.java | 24 +++++++++---------- ...java => ApplicationRebalanceListener.java} | 18 +++++++------- ...ler.java => ApplicationRecordHandler.java} | 2 +- .../java/de/juplo/kafka/DriverController.java | 8 +++---- .../java/de/juplo/kafka/EndlessConsumer.java | 14 +++++------ .../java/de/juplo/kafka/StateDocument.java | 3 +-- ...csRepository.java => StateRepository.java} | 2 +- .../juplo/kafka/GenericApplicationTests.java | 8 +++---- 8 files changed, 40 insertions(+), 39 deletions(-) rename src/main/java/de/juplo/kafka/{AdderRebalanceListener.java => ApplicationRebalanceListener.java} (80%) rename src/main/java/de/juplo/kafka/{AdderRecordHandler.java => ApplicationRecordHandler.java} (93%) rename src/main/java/de/juplo/kafka/{PartitionStatisticsRepository.java => StateRepository.java} (66%) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 973e973..9f54083 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -18,21 +18,21 @@ import java.util.concurrent.Executors; public class ApplicationConfiguration { @Bean - public AdderRecordHandler sumRecordHandler() + public ApplicationRecordHandler recordHandler() { - return new AdderRecordHandler(); + return new ApplicationRecordHandler(); } @Bean - public AdderRebalanceListener sumRebalanceListener( - AdderRecordHandler adderRecordHandler, - PartitionStatisticsRepository repository, + public ApplicationRebalanceListener rebalanceListener( + ApplicationRecordHandler recordHandler, + StateRepository stateRepository, Consumer consumer, ApplicationProperties properties) { - return new AdderRebalanceListener( - adderRecordHandler, - repository, + return new ApplicationRebalanceListener( + recordHandler, + stateRepository, properties.getClientId(), properties.getTopic(), Clock.systemDefaultZone(), @@ -44,8 +44,8 @@ public class ApplicationConfiguration public EndlessConsumer endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, - AdderRebalanceListener adderRebalanceListener, - AdderRecordHandler adderRecordHandler, + ApplicationRebalanceListener rebalanceListener, + ApplicationRecordHandler recordHandler, ApplicationProperties properties) { return @@ -54,8 +54,8 @@ public class ApplicationConfiguration properties.getClientId(), properties.getTopic(), kafkaConsumer, - adderRebalanceListener, - adderRecordHandler); + rebalanceListener, + recordHandler); } @Bean diff --git a/src/main/java/de/juplo/kafka/AdderRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java similarity index 80% rename from src/main/java/de/juplo/kafka/AdderRebalanceListener.java rename to src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index ef595ba..542af2d 100644 --- a/src/main/java/de/juplo/kafka/AdderRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -9,14 +9,15 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.util.Collection; +import java.util.Map; @RequiredArgsConstructor @Slf4j -public class AdderRebalanceListener implements PollIntervalAwareConsumerRebalanceListener +public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRebalanceListener { - private final AdderRecordHandler handler; - private final PartitionStatisticsRepository repository; + private final ApplicationRecordHandler recordHandler; + private final StateRepository stateRepository; private final String id; private final String topic; private final Clock clock; @@ -33,7 +34,7 @@ public class AdderRebalanceListener implements PollIntervalAwareConsumerRebalanc { Integer partition = tp.partition(); StateDocument document = - repository + stateRepository .findById(Integer.toString(partition)) .orElse(new StateDocument(partition)); log.info("{} - adding partition: {}, offset={}", id, partition, document.offset); @@ -43,7 +44,7 @@ public class AdderRebalanceListener implements PollIntervalAwareConsumerRebalanc // Otherwise: Use initial offset, generated by Kafka consumer.seek(tp, document.offset); } - handler.addPartition(partition, document.state); + recordHandler.addPartition(partition, document.state); }); } @@ -61,7 +62,8 @@ public class AdderRebalanceListener implements PollIntervalAwareConsumerRebalanc offset); if (commitsEnabled) { - repository.save(new StateDocument(partition, handler.removePartition(partition), offset)); + Map removed = recordHandler.removePartition(partition); + stateRepository.save(new StateDocument(partition, removed, offset)); } else { @@ -83,10 +85,10 @@ public class AdderRebalanceListener implements PollIntervalAwareConsumerRebalanc if (lastCommit.plus(commitInterval).isBefore(clock.instant())) { log.debug("Storing data and offsets, last commit: {}", lastCommit); - handler.getState().forEach((partiton, sumBusinessLogic) -> repository.save( + recordHandler.getState().forEach((partiton, adder) -> stateRepository.save( new StateDocument( partiton, - sumBusinessLogic.getState(), + adder.getState(), consumer.position(new TopicPartition(topic, partiton))))); lastCommit = clock.instant(); } diff --git a/src/main/java/de/juplo/kafka/AdderRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java similarity index 93% rename from src/main/java/de/juplo/kafka/AdderRecordHandler.java rename to src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index ecd47bc..d0d385c 100644 --- a/src/main/java/de/juplo/kafka/AdderRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -8,7 +8,7 @@ import java.util.Map; @Slf4j -public class AdderRecordHandler implements RecordHandler +public class ApplicationRecordHandler implements RecordHandler { private final Map state = new HashMap<>(); diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index 0870f19..d389271 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -16,7 +16,7 @@ import java.util.stream.Collectors; public class DriverController { private final EndlessConsumer consumer; - private final AdderRecordHandler adderRecordHandler; + private final ApplicationRecordHandler recordHandler; @PostMapping("start") @@ -36,7 +36,7 @@ public class DriverController public Map> state() { return - adderRecordHandler + recordHandler .getState() .entrySet() .stream() @@ -48,9 +48,9 @@ public class DriverController @GetMapping("state/{user}") public ResponseEntity seen(@PathVariable String user) { - for (AdderBusinessLogic adderBusinessLogic : adderRecordHandler.getState().values()) + for (AdderBusinessLogic adder : recordHandler.getState().values()) { - Optional sum = adderBusinessLogic.getSum(user); + Optional sum = adder.getSum(user); if (sum.isPresent()) return ResponseEntity.ok(sum.get()); } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index cfba6df..58374f4 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -25,8 +25,8 @@ public class EndlessConsumer implements Runnable private final String id; private final String topic; private final Consumer consumer; - private final PollIntervalAwareConsumerRebalanceListener pollIntervalAwareRebalanceListener; - private final RecordHandler handler; + private final PollIntervalAwareConsumerRebalanceListener rebalanceListener; + private final RecordHandler recordHandler; private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); @@ -42,8 +42,8 @@ public class EndlessConsumer implements Runnable try { log.info("{} - Subscribing to topic {}", id, topic); - pollIntervalAwareRebalanceListener.enableCommits(); - consumer.subscribe(Arrays.asList(topic), pollIntervalAwareRebalanceListener); + rebalanceListener.enableCommits(); + consumer.subscribe(Arrays.asList(topic), rebalanceListener); while (true) { @@ -64,12 +64,12 @@ public class EndlessConsumer implements Runnable record.value() ); - handler.accept(record); + recordHandler.accept(record); consumed++; } - pollIntervalAwareRebalanceListener.beforeNextPoll(); + rebalanceListener.beforeNextPoll(); } } catch(WakeupException e) @@ -93,7 +93,7 @@ public class EndlessConsumer implements Runnable catch(Exception e) { log.error("{} - Unexpected error: {}, disabling commits", id, e.toString(), e); - pollIntervalAwareRebalanceListener.disableCommits(); + rebalanceListener.disableCommits(); shutdown(e); } finally diff --git a/src/main/java/de/juplo/kafka/StateDocument.java b/src/main/java/de/juplo/kafka/StateDocument.java index 2583c8e..0540e3f 100644 --- a/src/main/java/de/juplo/kafka/StateDocument.java +++ b/src/main/java/de/juplo/kafka/StateDocument.java @@ -5,11 +5,10 @@ import org.springframework.data.annotation.Id; import org.springframework.data.mongodb.core.mapping.Document; import java.util.HashMap; -import java.util.List; import java.util.Map; -@Document(collection = "statistics") +@Document(collection = "state") @ToString public class StateDocument { diff --git a/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java b/src/main/java/de/juplo/kafka/StateRepository.java similarity index 66% rename from src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java rename to src/main/java/de/juplo/kafka/StateRepository.java index 9e26410..3129535 100644 --- a/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java +++ b/src/main/java/de/juplo/kafka/StateRepository.java @@ -5,7 +5,7 @@ import org.springframework.data.mongodb.repository.MongoRepository; import java.util.Optional; -public interface PartitionStatisticsRepository extends MongoRepository +public interface StateRepository extends MongoRepository { public Optional findById(String partition); } diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 711a44a..9a6f812 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -60,7 +60,7 @@ abstract class GenericApplicationTests @Autowired ExecutorService executor; @Autowired - PartitionStatisticsRepository partitionStatisticsRepository; + StateRepository stateRepository; @Autowired PollIntervalAwareConsumerRebalanceListener rebalanceListener; @Autowired @@ -233,11 +233,11 @@ abstract class GenericApplicationTests log.info("New position for {}: {}", tp, offset); Integer partition = tp.partition(); StateDocument document = - partitionStatisticsRepository + stateRepository .findById(partition.toString()) .orElse(new StateDocument(partition)); document.offset = offset; - partitionStatisticsRepository.save(document); + stateRepository.save(document); }); offsetConsumer.unsubscribe(); } @@ -247,7 +247,7 @@ abstract class GenericApplicationTests partitions().forEach(tp -> { String partition = Integer.toString(tp.partition()); - Optional offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset); + Optional offset = stateRepository.findById(partition).map(document -> document.offset); consumer.accept(tp, offset.orElse(0l)); }); } -- 2.20.1