From 3bc3ef7ceb76e7643e1e373293cfdc78f870838a Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 21 Oct 2021 22:06:21 +0200 Subject: [PATCH] splitter: 1.0.0 - splits up the recorded sentences into words --- Dockerfile | 2 +- pom.xml | 8 +++--- .../de/juplo/kafka/wordcount/counter/Key.java | 11 -------- .../SplitterApplication.java} | 8 +++--- .../SplitterApplicationProperties.java} | 10 +++---- .../SplitterStreamProcessor.java} | 28 +++---------------- src/main/resources/application.properties | 2 +- 7 files changed, 19 insertions(+), 50 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/wordcount/counter/Key.java rename src/main/java/de/juplo/kafka/wordcount/{counter/CounterApplication.java => splitter/SplitterApplication.java} (57%) rename src/main/java/de/juplo/kafka/wordcount/{counter/CounterApplicationProperties.java => splitter/SplitterApplicationProperties.java} (55%) rename src/main/java/de/juplo/kafka/wordcount/{counter/CounterStreamProcessor.java => splitter/SplitterStreamProcessor.java} (76%) diff --git a/Dockerfile b/Dockerfile index d2218b8..803477f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ FROM openjdk:11-jre-slim COPY target/*.jar /opt/app.jar -EXPOSE 8083 +EXPOSE 8086 ENTRYPOINT ["java", "-jar", "/opt/app.jar"] CMD [] diff --git a/pom.xml b/pom.xml index 898fc54..54df55f 100644 --- a/pom.xml +++ b/pom.xml @@ -9,10 +9,10 @@ de.juplo.kafka.wordcount - counter - 1.0.1 - Wordcount-Counter - Word-counting stream-processor of the multi-user wordcount-example + splitter + 1.0.0 + Wordcount-Splitter + Stream-processor of the multi-user wordcount-example, that splits the sentences up into single words 0.33.0 11 diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/Key.java b/src/main/java/de/juplo/kafka/wordcount/counter/Key.java deleted file mode 100644 index 1e00dca..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/Key.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import lombok.Value; - - -@Value(staticConstructor = "of") -public class Key -{ - private final String username; - private final String word; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java similarity index 57% rename from src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java rename to src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index 1f73d32..491c549 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.counter; +package de.juplo.kafka.wordcount.splitter; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -6,11 +6,11 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties @SpringBootApplication -@EnableConfigurationProperties(CounterApplicationProperties.class) -public class CounterApplication +@EnableConfigurationProperties(SplitterApplicationProperties.class) +public class SplitterApplication { public static void main(String[] args) { - SpringApplication.run(CounterApplication.class, args); + SpringApplication.run(SplitterApplication.class, args); } } diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationProperties.java similarity index 55% rename from src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java rename to src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationProperties.java index d670ba2..e07b7cd 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationProperties.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.counter; +package de.juplo.kafka.wordcount.splitter; import lombok.Getter; @@ -7,14 +7,14 @@ import lombok.ToString; import org.springframework.boot.context.properties.ConfigurationProperties; -@ConfigurationProperties("juplo.wordcount.counter") +@ConfigurationProperties("juplo.wordcount.splitter") @Getter @Setter @ToString -public class CounterApplicationProperties +public class SplitterApplicationProperties { private String bootstrapServer = "localhost:9092"; - private String applicationId = "counter"; + private String applicationId = "splitter"; private String inputTopic = "recordings"; - private String outputTopic = "countings"; + private String outputTopic = "words"; } diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java similarity index 76% rename from src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java rename to src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java index e8d7c11..7218d6f 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java @@ -1,12 +1,9 @@ -package de.juplo.kafka.wordcount.counter; +package de.juplo.kafka.wordcount.splitter; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; @@ -26,16 +23,15 @@ import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.St @Slf4j @Component -public class CounterStreamProcessor +public class SplitterStreamProcessor { final static Pattern PATTERN = Pattern.compile("\\W+"); public final KafkaStreams streams; - public CounterStreamProcessor( - CounterApplicationProperties properties, - ObjectMapper mapper, + public SplitterStreamProcessor( + SplitterApplicationProperties properties, ConfigurableApplicationContext context) { StreamsBuilder builder = new StreamsBuilder(); @@ -43,22 +39,6 @@ public class CounterStreamProcessor KStream source = builder.stream(properties.getInputTopic()); source .flatMapValues(sentence -> Arrays.asList(PATTERN.split(sentence))) - .map((username, word) -> - { - try - { - String key = mapper.writeValueAsString(Key.of(username, word)); - return new KeyValue<>(key, word); - } - catch (JsonProcessingException e) - { - throw new RuntimeException(e); - } - }) - .groupByKey() - .count() - .mapValues(value->Long.toString(value)) - .toStream() .to(properties.getOutputTopic()); Properties props = new Properties(); diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index c52cd9c..3046fc3 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,2 +1,2 @@ -server.port=8083 +server.port=8086 management.endpoints.web.exposure.include=* -- 2.20.1