From fe049944648d9675ffe66b586b6bf860fbac696f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 21 Oct 2021 21:49:51 +0200 Subject: [PATCH] counter: 1.1.0 - Only counts words (splitting is done separately now) --- pom.xml | 2 +- .../wordcount/counter/CounterApplicationProperties.java | 2 +- .../kafka/wordcount/counter/CounterStreamProcessor.java | 5 ----- 3 files changed, 2 insertions(+), 7 deletions(-) diff --git a/pom.xml b/pom.xml index 898fc54..4fe2235 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount counter - 1.0.1 + 1.1.0 Wordcount-Counter Word-counting stream-processor of the multi-user wordcount-example diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java index d670ba2..351ea18 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java @@ -15,6 +15,6 @@ public class CounterApplicationProperties { private String bootstrapServer = "localhost:9092"; private String applicationId = "counter"; - private String inputTopic = "recordings"; + private String inputTopic = "words"; private String outputTopic = "countings"; } diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java index e8d7c11..4d3bcc4 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java @@ -16,10 +16,8 @@ import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import java.util.Arrays; import java.util.Properties; import java.util.concurrent.CompletableFuture; -import java.util.regex.Pattern; import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; @@ -28,8 +26,6 @@ import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.St @Component public class CounterStreamProcessor { - final static Pattern PATTERN = Pattern.compile("\\W+"); - public final KafkaStreams streams; @@ -42,7 +38,6 @@ public class CounterStreamProcessor KStream source = builder.stream(properties.getInputTopic()); source - .flatMapValues(sentence -> Arrays.asList(PATTERN.split(sentence))) .map((username, word) -> { try -- 2.20.1