From c808810e9e33afe33b29f7fd3921023ecd15483d Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 13 Aug 2022 15:15:43 +0200 Subject: [PATCH] TMP --- .../juplo/kafka/ApplicationConfiguration.java | 20 +++++++++---------- .../java/de/juplo/kafka/DriverController.java | 2 +- ...istener.java => SumRebalanceListener.java} | 4 ++-- ...cordHandler.java => SumRecordHandler.java} | 2 +- .../java/de/juplo/kafka/ApplicationTests.java | 8 ++++---- 5 files changed, 18 insertions(+), 18 deletions(-) rename src/main/java/de/juplo/kafka/{WordcountRebalanceListener.java => SumRebalanceListener.java} (94%) rename src/main/java/de/juplo/kafka/{WordcountRecordHandler.java => SumRecordHandler.java} (94%) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index d48c027..3be8f95 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -18,20 +18,20 @@ import java.util.concurrent.Executors; public class ApplicationConfiguration { @Bean - public WordcountRecordHandler wordcountRecordHandler() + public SumRecordHandler sumRecordHandler() { - return new WordcountRecordHandler(); + return new SumRecordHandler(); } @Bean - public WordcountRebalanceListener wordcountRebalanceListener( - WordcountRecordHandler wordcountRecordHandler, + public SumRebalanceListener sumRebalanceListener( + SumRecordHandler sumRecordHandler, PartitionStatisticsRepository repository, Consumer consumer, ApplicationProperties properties) { - return new WordcountRebalanceListener( - wordcountRecordHandler, + return new SumRebalanceListener( + sumRecordHandler, repository, properties.getClientId(), properties.getTopic(), @@ -44,8 +44,8 @@ public class ApplicationConfiguration public EndlessConsumer endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, - WordcountRebalanceListener wordcountRebalanceListener, - WordcountRecordHandler wordcountRecordHandler, + SumRebalanceListener sumRebalanceListener, + SumRecordHandler sumRecordHandler, ApplicationProperties properties) { return @@ -54,8 +54,8 @@ public class ApplicationConfiguration properties.getClientId(), properties.getTopic(), kafkaConsumer, - wordcountRebalanceListener, - wordcountRecordHandler); + sumRebalanceListener, + sumRecordHandler); } @Bean diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index 5d6c1a8..5a09c1b 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -14,7 +14,7 @@ import java.util.concurrent.ExecutionException; public class DriverController { private final EndlessConsumer consumer; - private final WordcountRecordHandler wordcount; + private final SumRecordHandler wordcount; @PostMapping("start") diff --git a/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java b/src/main/java/de/juplo/kafka/SumRebalanceListener.java similarity index 94% rename from src/main/java/de/juplo/kafka/WordcountRebalanceListener.java rename to src/main/java/de/juplo/kafka/SumRebalanceListener.java index 9f2fc0f..1cd738f 100644 --- a/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/SumRebalanceListener.java @@ -14,9 +14,9 @@ import java.util.Map; @RequiredArgsConstructor @Slf4j -public class WordcountRebalanceListener implements PollIntervalAwareConsumerRebalanceListener +public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceListener { - private final WordcountRecordHandler handler; + private final SumRecordHandler handler; private final PartitionStatisticsRepository repository; private final String id; private final String topic; diff --git a/src/main/java/de/juplo/kafka/WordcountRecordHandler.java b/src/main/java/de/juplo/kafka/SumRecordHandler.java similarity index 94% rename from src/main/java/de/juplo/kafka/WordcountRecordHandler.java rename to src/main/java/de/juplo/kafka/SumRecordHandler.java index 4efc547..82ada38 100644 --- a/src/main/java/de/juplo/kafka/WordcountRecordHandler.java +++ b/src/main/java/de/juplo/kafka/SumRecordHandler.java @@ -9,7 +9,7 @@ import java.util.regex.Pattern; @Slf4j -public class WordcountRecordHandler implements RecordHandler +public class SumRecordHandler implements RecordHandler { final static Pattern PATTERN = Pattern.compile("\\W+"); diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index aa3dfd6..09614b8 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -71,9 +71,9 @@ class ApplicationTests @Autowired PartitionStatisticsRepository repository; @Autowired - WordcountRebalanceListener wordcountRebalanceListener; + SumRebalanceListener sumRebalanceListener; @Autowired - WordcountRecordHandler wordcountRecordHandler; + SumRecordHandler sumRecordHandler; EndlessConsumer endlessConsumer; Map oldOffsets; @@ -243,7 +243,7 @@ class ApplicationTests }); TestRecordHandler captureOffsetAndExecuteTestHandler = - new TestRecordHandler(wordcountRecordHandler) { + new TestRecordHandler(sumRecordHandler) { @Override public void onNewRecord(ConsumerRecord record) { @@ -260,7 +260,7 @@ class ApplicationTests properties.getClientId(), properties.getTopic(), kafkaConsumer, - wordcountRebalanceListener, + sumRebalanceListener, captureOffsetAndExecuteTestHandler); endlessConsumer.start(); -- 2.20.1