From 858cf4734c21b1f9de57fbbe10ef1513c86df2a6 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 24 Jul 2022 11:10:45 +0200 Subject: [PATCH] WIP --- pom.xml | 5 +++-- src/main/java/de/juplo/kafka/Application.java | 6 +++--- src/main/java/de/juplo/kafka/ApplicationConfiguration.java | 4 ++-- .../java/de/juplo/kafka/ApplicationHealthIndicator.java | 4 ++-- src/main/java/de/juplo/kafka/DriverController.java | 2 +- .../de/juplo/kafka/{EndlessConsumer.java => Wordcount.java} | 2 +- 6 files changed, 12 insertions(+), 11 deletions(-) rename src/main/java/de/juplo/kafka/{EndlessConsumer.java => Wordcount.java} (98%) diff --git a/pom.xml b/pom.xml index 701704d..fe06959 100644 --- a/pom.xml +++ b/pom.xml @@ -12,9 +12,10 @@ de.juplo.kafka - endless-consumer + wordcount 1.0-SNAPSHOT - Endless Consumer: a Simple Consumer-Group that reads and prints the topic and counts the received messages for each key by topic + Wordcount + Splits the incomming sentences into words and counts the words per user. diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 76c2520..1d69370 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -17,7 +17,7 @@ import java.util.concurrent.TimeUnit; public class Application implements ApplicationRunner { @Autowired - EndlessConsumer endlessConsumer; + Wordcount wordcount; @Autowired ExecutorService executor; @@ -26,7 +26,7 @@ public class Application implements ApplicationRunner public void run(ApplicationArguments args) throws Exception { log.info("Starting EndlessConsumer"); - endlessConsumer.start(); + wordcount.start(); } @PreDestroy @@ -35,7 +35,7 @@ public class Application implements ApplicationRunner try { log.info("Stopping EndlessConsumer"); - endlessConsumer.stop(); + wordcount.stop(); } catch (IllegalStateException e) { diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 08c3955..a4d6e57 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -28,7 +28,7 @@ public class ApplicationConfiguration } @Bean - public EndlessConsumer endlessConsumer( + public Wordcount endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, Consumer> handler, @@ -36,7 +36,7 @@ public class ApplicationConfiguration ApplicationProperties properties) { return - new EndlessConsumer<>( + new Wordcount<>( executor, repository, properties.getClientId(), diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java index df4e653..afb0413 100644 --- a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java +++ b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java @@ -10,7 +10,7 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class ApplicationHealthIndicator implements HealthIndicator { - private final EndlessConsumer consumer; + private final Wordcount wordcount; @Override @@ -18,7 +18,7 @@ public class ApplicationHealthIndicator implements HealthIndicator { try { - return consumer + return wordcount .exitStatus() .map(Health::down) .orElse(Health.outOfService()) diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index ed38080..998ba14 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -16,7 +16,7 @@ import java.util.concurrent.ExecutionException; @RequiredArgsConstructor public class DriverController { - private final EndlessConsumer consumer; + private final Wordcount consumer; @PostMapping("start") diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/Wordcount.java similarity index 98% rename from src/main/java/de/juplo/kafka/EndlessConsumer.java rename to src/main/java/de/juplo/kafka/Wordcount.java index f9a9629..fe9958a 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/Wordcount.java @@ -21,7 +21,7 @@ import java.util.concurrent.locks.ReentrantLock; @Slf4j @RequiredArgsConstructor -public class EndlessConsumer implements ConsumerRebalanceListener, Runnable +public class Wordcount implements ConsumerRebalanceListener, Runnable { private final ExecutorService executor; private final PartitionStatisticsRepository repository; -- 2.20.1