From: Kai Moritz Date: Thu, 21 Oct 2021 19:49:51 +0000 (+0200) Subject: counter: 1.1.0 - Only counts words (splitting is done separately now) X-Git-Tag: counter-1.1.0 X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fwordcount;a=commitdiff_plain;h=fe049944648d9675ffe66b586b6bf860fbac696f counter: 1.1.0 - Only counts words (splitting is done separately now) --- diff --git a/pom.xml b/pom.xml index 898fc54..4fe2235 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount counter - 1.0.1 + 1.1.0 Wordcount-Counter Word-counting stream-processor of the multi-user wordcount-example diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java index d670ba2..351ea18 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java @@ -15,6 +15,6 @@ public class CounterApplicationProperties { private String bootstrapServer = "localhost:9092"; private String applicationId = "counter"; - private String inputTopic = "recordings"; + private String inputTopic = "words"; private String outputTopic = "countings"; } diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java index e8d7c11..4d3bcc4 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java @@ -16,10 +16,8 @@ import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import java.util.Arrays; import java.util.Properties; import java.util.concurrent.CompletableFuture; -import java.util.regex.Pattern; import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; @@ -28,8 +26,6 @@ import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.St @Component public class CounterStreamProcessor { - final static Pattern PATTERN = Pattern.compile("\\W+"); - public final KafkaStreams streams; @@ -42,7 +38,6 @@ public class CounterStreamProcessor KStream source = builder.stream(properties.getInputTopic()); source - .flatMapValues(sentence -> Arrays.asList(PATTERN.split(sentence))) .map((username, word) -> { try