From 02ab54c6b3c099f5b0bd420fc0a37034badf1c71 Mon Sep 17 00:00:00 2001
From: Kai Moritz <kai@juplo.de>
Date: Wed, 1 Sep 2021 22:31:01 +0200
Subject: [PATCH] wordcount:1.0.0 - counts words, keyed by username and word

---
 .dockerignore                                 |  2 +
 .gitignore                                    | 31 ++++++
 Dockerfile                                    |  5 +
 pom.xml                                       | 87 +++++++++++++++++
 .../wordcount/counter/CounterApplication.java | 16 +++
 .../counter/CounterApplicationProperties.java | 20 ++++
 .../counter/CounterStreamProcessor.java       | 97 +++++++++++++++++++
 .../de/juplo/kafka/wordcount/counter/Key.java | 11 +++
 src/main/resources/application.properties     |  1 +
 9 files changed, 270 insertions(+)
 create mode 100644 .dockerignore
 create mode 100644 .gitignore
 create mode 100644 Dockerfile
 create mode 100644 pom.xml
 create mode 100644 src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java
 create mode 100644 src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java
 create mode 100644 src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
 create mode 100644 src/main/java/de/juplo/kafka/wordcount/counter/Key.java
 create mode 100644 src/main/resources/application.properties

diff --git a/.dockerignore b/.dockerignore
new file mode 100644
index 0000000..1ad9963
--- /dev/null
+++ b/.dockerignore
@@ -0,0 +1,2 @@
+*
+!target/*.jar
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..7339146
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,31 @@
+target/
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
diff --git a/Dockerfile b/Dockerfile
new file mode 100644
index 0000000..bbd15ef
--- /dev/null
+++ b/Dockerfile
@@ -0,0 +1,5 @@
+FROM openjdk:11-jre-slim
+COPY target/*.jar /opt/app.jar
+EXPOSE 8080
+ENTRYPOINT ["java", "-jar", "/opt/app.jar"]
+CMD []
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..71115c0
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,87 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.springframework.boot</groupId>
+		<artifactId>spring-boot-starter-parent</artifactId>
+		<version>2.5.4</version>
+		<relativePath/> <!-- lookup parent from repository -->
+	</parent>
+	<groupId>de.juplo.kafka.wordcount</groupId>
+	<artifactId>counter</artifactId>
+	<version>1.0.0</version>
+	<name>Wordcount-Counter</name>
+	<description>Word-counting stream-processor of the multi-user wordcount-example</description>
+	<properties>
+		<docker-maven-plugin.version>0.33.0</docker-maven-plugin.version>
+		<java.version>11</java.version>
+		<kafka.version>2.8.0</kafka.version>
+	</properties>
+	<dependencies>
+		<dependency>
+			<groupId>org.springframework.boot</groupId>
+			<artifactId>spring-boot-starter-actuator</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.springframework.boot</groupId>
+			<artifactId>spring-boot-starter-web</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka-streams</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>org.springframework.boot</groupId>
+			<artifactId>spring-boot-devtools</artifactId>
+			<scope>runtime</scope>
+			<optional>true</optional>
+		</dependency>
+		<dependency>
+			<groupId>org.springframework.boot</groupId>
+			<artifactId>spring-boot-configuration-processor</artifactId>
+			<optional>true</optional>
+		</dependency>
+		<dependency>
+			<groupId>org.projectlombok</groupId>
+			<artifactId>lombok</artifactId>
+			<optional>true</optional>
+		</dependency>
+		<dependency>
+			<groupId>org.springframework.boot</groupId>
+			<artifactId>spring-boot-starter-test</artifactId>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.springframework.boot</groupId>
+				<artifactId>spring-boot-maven-plugin</artifactId>
+				<configuration>
+					<excludes>
+						<exclude>
+							<groupId>org.projectlombok</groupId>
+							<artifactId>lombok</artifactId>
+						</exclude>
+					</excludes>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>io.fabric8</groupId>
+				<artifactId>docker-maven-plugin</artifactId>
+				<version>${docker-maven-plugin.version}</version>
+				<configuration>
+					<images>
+						<image>
+							<name>juplo/wordcount--%a:%v</name>
+						</image>
+					</images>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java
new file mode 100644
index 0000000..1f73d32
--- /dev/null
+++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java
@@ -0,0 +1,16 @@
+package de.juplo.kafka.wordcount.counter;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+
+
+@SpringBootApplication
+@EnableConfigurationProperties(CounterApplicationProperties.class)
+public class CounterApplication
+{
+	public static void main(String[] args)
+	{
+		SpringApplication.run(CounterApplication.class, args);
+	}
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java
new file mode 100644
index 0000000..d670ba2
--- /dev/null
+++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java
@@ -0,0 +1,20 @@
+package de.juplo.kafka.wordcount.counter;
+
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties("juplo.wordcount.counter")
+@Getter
+@Setter
+@ToString
+public class CounterApplicationProperties
+{
+  private String bootstrapServer = "localhost:9092";
+  private String applicationId = "counter";
+  private String inputTopic = "recordings";
+  private String outputTopic = "countings";
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
new file mode 100644
index 0000000..e8d7c11
--- /dev/null
+++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
@@ -0,0 +1,97 @@
+package de.juplo.kafka.wordcount.counter;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.springframework.boot.SpringApplication;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.regex.Pattern;
+
+import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+
+
+@Slf4j
+@Component
+public class CounterStreamProcessor
+{
+	final static Pattern PATTERN = Pattern.compile("\\W+");
+
+	public final KafkaStreams streams;
+
+
+	public CounterStreamProcessor(
+			CounterApplicationProperties properties,
+			ObjectMapper mapper,
+			ConfigurableApplicationContext context)
+	{
+		StreamsBuilder builder = new StreamsBuilder();
+
+		KStream<String, String> source = builder.stream(properties.getInputTopic());
+		source
+				.flatMapValues(sentence -> Arrays.asList(PATTERN.split(sentence)))
+				.map((username, word) ->
+				{
+					try
+					{
+						String key = mapper.writeValueAsString(Key.of(username, word));
+						return new KeyValue<>(key, word);
+					}
+					catch (JsonProcessingException e)
+					{
+						throw new RuntimeException(e);
+					}
+				})
+				.groupByKey()
+				.count()
+				.mapValues(value->Long.toString(value))
+				.toStream()
+				.to(properties.getOutputTopic());
+
+		Properties props = new Properties();
+		props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
+		props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+		props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+		props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+		streams = new KafkaStreams(builder.build(), props);
+		streams.setUncaughtExceptionHandler((Throwable e) ->
+		{
+			log.error("Unexpected error!", e);
+			CompletableFuture.runAsync(() ->
+			{
+				log.info("Stopping application...");
+				SpringApplication.exit(context, () -> 1);
+			});
+			return SHUTDOWN_CLIENT;
+		});
+	}
+
+	@PostConstruct
+	public void start()
+	{
+		log.info("Starting Stream-Processor");
+		streams.start();
+	}
+
+	@PreDestroy
+	public void stop()
+	{
+		log.info("Stopping Stream-Processor");
+		streams.close();
+	}
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/Key.java b/src/main/java/de/juplo/kafka/wordcount/counter/Key.java
new file mode 100644
index 0000000..1e00dca
--- /dev/null
+++ b/src/main/java/de/juplo/kafka/wordcount/counter/Key.java
@@ -0,0 +1,11 @@
+package de.juplo.kafka.wordcount.counter;
+
+import lombok.Value;
+
+
+@Value(staticConstructor = "of")
+public class Key
+{
+  private final String username;
+  private final String word;
+}
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/src/main/resources/application.properties
@@ -0,0 +1 @@
+
-- 
2.20.1