From: Kai Moritz Date: Thu, 21 Oct 2021 20:06:21 +0000 (+0200) Subject: splitter: 1.0.0 - splits up the recorded sentences into words X-Git-Tag: splitter-1.0.0 X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fwordcount;a=commitdiff_plain;h=3bc3ef7ceb76e7643e1e373293cfdc78f870838a splitter: 1.0.0 - splits up the recorded sentences into words --- diff --git a/Dockerfile b/Dockerfile index d2218b8..803477f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ FROM openjdk:11-jre-slim COPY target/*.jar /opt/app.jar -EXPOSE 8083 +EXPOSE 8086 ENTRYPOINT ["java", "-jar", "/opt/app.jar"] CMD [] diff --git a/pom.xml b/pom.xml index 898fc54..54df55f 100644 --- a/pom.xml +++ b/pom.xml @@ -9,10 +9,10 @@ de.juplo.kafka.wordcount - counter - 1.0.1 - Wordcount-Counter - Word-counting stream-processor of the multi-user wordcount-example + splitter + 1.0.0 + Wordcount-Splitter + Stream-processor of the multi-user wordcount-example, that splits the sentences up into single words 0.33.0 11 diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java deleted file mode 100644 index 1f73d32..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java +++ /dev/null @@ -1,16 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; - - -@SpringBootApplication -@EnableConfigurationProperties(CounterApplicationProperties.class) -public class CounterApplication -{ - public static void main(String[] args) - { - SpringApplication.run(CounterApplication.class, args); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java deleted file mode 100644 index d670ba2..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java +++ /dev/null @@ -1,20 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - - -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; -import org.springframework.boot.context.properties.ConfigurationProperties; - - -@ConfigurationProperties("juplo.wordcount.counter") -@Getter -@Setter -@ToString -public class CounterApplicationProperties -{ - private String bootstrapServer = "localhost:9092"; - private String applicationId = "counter"; - private String inputTopic = "recordings"; - private String outputTopic = "countings"; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java deleted file mode 100644 index e8d7c11..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ /dev/null @@ -1,97 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.KStream; -import org.springframework.boot.SpringApplication; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.stereotype.Component; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import java.util.Arrays; -import java.util.Properties; -import java.util.concurrent.CompletableFuture; -import java.util.regex.Pattern; - -import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; - - -@Slf4j -@Component -public class CounterStreamProcessor -{ - final static Pattern PATTERN = Pattern.compile("\\W+"); - - public final KafkaStreams streams; - - - public CounterStreamProcessor( - CounterApplicationProperties properties, - ObjectMapper mapper, - ConfigurableApplicationContext context) - { - StreamsBuilder builder = new StreamsBuilder(); - - KStream source = builder.stream(properties.getInputTopic()); - source - .flatMapValues(sentence -> Arrays.asList(PATTERN.split(sentence))) - .map((username, word) -> - { - try - { - String key = mapper.writeValueAsString(Key.of(username, word)); - return new KeyValue<>(key, word); - } - catch (JsonProcessingException e) - { - throw new RuntimeException(e); - } - }) - .groupByKey() - .count() - .mapValues(value->Long.toString(value)) - .toStream() - .to(properties.getOutputTopic()); - - Properties props = new Properties(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - streams = new KafkaStreams(builder.build(), props); - streams.setUncaughtExceptionHandler((Throwable e) -> - { - log.error("Unexpected error!", e); - CompletableFuture.runAsync(() -> - { - log.info("Stopping application..."); - SpringApplication.exit(context, () -> 1); - }); - return SHUTDOWN_CLIENT; - }); - } - - @PostConstruct - public void start() - { - log.info("Starting Stream-Processor"); - streams.start(); - } - - @PreDestroy - public void stop() - { - log.info("Stopping Stream-Processor"); - streams.close(); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/Key.java b/src/main/java/de/juplo/kafka/wordcount/counter/Key.java deleted file mode 100644 index 1e00dca..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/Key.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import lombok.Value; - - -@Value(staticConstructor = "of") -public class Key -{ - private final String username; - private final String word; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java new file mode 100644 index 0000000..491c549 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -0,0 +1,16 @@ +package de.juplo.kafka.wordcount.splitter; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; + + +@SpringBootApplication +@EnableConfigurationProperties(SplitterApplicationProperties.class) +public class SplitterApplication +{ + public static void main(String[] args) + { + SpringApplication.run(SplitterApplication.class, args); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationProperties.java new file mode 100644 index 0000000..e07b7cd --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationProperties.java @@ -0,0 +1,20 @@ +package de.juplo.kafka.wordcount.splitter; + + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.springframework.boot.context.properties.ConfigurationProperties; + + +@ConfigurationProperties("juplo.wordcount.splitter") +@Getter +@Setter +@ToString +public class SplitterApplicationProperties +{ + private String bootstrapServer = "localhost:9092"; + private String applicationId = "splitter"; + private String inputTopic = "recordings"; + private String outputTopic = "words"; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java new file mode 100644 index 0000000..7218d6f --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java @@ -0,0 +1,77 @@ +package de.juplo.kafka.wordcount.splitter; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.springframework.boot.SpringApplication; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.Arrays; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.regex.Pattern; + +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; + + +@Slf4j +@Component +public class SplitterStreamProcessor +{ + final static Pattern PATTERN = Pattern.compile("\\W+"); + + public final KafkaStreams streams; + + + public SplitterStreamProcessor( + SplitterApplicationProperties properties, + ConfigurableApplicationContext context) + { + StreamsBuilder builder = new StreamsBuilder(); + + KStream source = builder.stream(properties.getInputTopic()); + source + .flatMapValues(sentence -> Arrays.asList(PATTERN.split(sentence))) + .to(properties.getOutputTopic()); + + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + streams = new KafkaStreams(builder.build(), props); + streams.setUncaughtExceptionHandler((Throwable e) -> + { + log.error("Unexpected error!", e); + CompletableFuture.runAsync(() -> + { + log.info("Stopping application..."); + SpringApplication.exit(context, () -> 1); + }); + return SHUTDOWN_CLIENT; + }); + } + + @PostConstruct + public void start() + { + log.info("Starting Stream-Processor"); + streams.start(); + } + + @PreDestroy + public void stop() + { + log.info("Stopping Stream-Processor"); + streams.close(); + } +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index c52cd9c..3046fc3 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,2 +1,2 @@ -server.port=8083 +server.port=8086 management.endpoints.web.exposure.include=*