wordcount:1.0.0 - counts words, keyed by username and word counter-1.0.0
authorKai Moritz <kai@juplo.de>
Wed, 1 Sep 2021 20:31:01 +0000 (22:31 +0200)
committerKai Moritz <kai@juplo.de>
Mon, 11 Oct 2021 18:44:13 +0000 (20:44 +0200)
.dockerignore [new file with mode: 0644]
.gitignore [new file with mode: 0644]
Dockerfile [new file with mode: 0644]
pom.xml [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/counter/Key.java [new file with mode: 0644]
src/main/resources/application.properties [new file with mode: 0644]

diff --git a/.dockerignore b/.dockerignore
new file mode 100644 (file)
index 0000000..1ad9963
--- /dev/null
@@ -0,0 +1,2 @@
+*
+!target/*.jar
diff --git a/.gitignore b/.gitignore
new file mode 100644 (file)
index 0000000..7339146
--- /dev/null
@@ -0,0 +1,31 @@
+target/
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
diff --git a/Dockerfile b/Dockerfile
new file mode 100644 (file)
index 0000000..bbd15ef
--- /dev/null
@@ -0,0 +1,5 @@
+FROM openjdk:11-jre-slim
+COPY target/*.jar /opt/app.jar
+EXPOSE 8080
+ENTRYPOINT ["java", "-jar", "/opt/app.jar"]
+CMD []
diff --git a/pom.xml b/pom.xml
new file mode 100644 (file)
index 0000000..71115c0
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,87 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+       <modelVersion>4.0.0</modelVersion>
+       <parent>
+               <groupId>org.springframework.boot</groupId>
+               <artifactId>spring-boot-starter-parent</artifactId>
+               <version>2.5.4</version>
+               <relativePath/> <!-- lookup parent from repository -->
+       </parent>
+       <groupId>de.juplo.kafka.wordcount</groupId>
+       <artifactId>counter</artifactId>
+       <version>1.0.0</version>
+       <name>Wordcount-Counter</name>
+       <description>Word-counting stream-processor of the multi-user wordcount-example</description>
+       <properties>
+               <docker-maven-plugin.version>0.33.0</docker-maven-plugin.version>
+               <java.version>11</java.version>
+               <kafka.version>2.8.0</kafka.version>
+       </properties>
+       <dependencies>
+               <dependency>
+                       <groupId>org.springframework.boot</groupId>
+                       <artifactId>spring-boot-starter-actuator</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.springframework.boot</groupId>
+                       <artifactId>spring-boot-starter-web</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.kafka</groupId>
+                       <artifactId>kafka-streams</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.springframework.boot</groupId>
+                       <artifactId>spring-boot-devtools</artifactId>
+                       <scope>runtime</scope>
+                       <optional>true</optional>
+               </dependency>
+               <dependency>
+                       <groupId>org.springframework.boot</groupId>
+                       <artifactId>spring-boot-configuration-processor</artifactId>
+                       <optional>true</optional>
+               </dependency>
+               <dependency>
+                       <groupId>org.projectlombok</groupId>
+                       <artifactId>lombok</artifactId>
+                       <optional>true</optional>
+               </dependency>
+               <dependency>
+                       <groupId>org.springframework.boot</groupId>
+                       <artifactId>spring-boot-starter-test</artifactId>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.springframework.boot</groupId>
+                               <artifactId>spring-boot-maven-plugin</artifactId>
+                               <configuration>
+                                       <excludes>
+                                               <exclude>
+                                                       <groupId>org.projectlombok</groupId>
+                                                       <artifactId>lombok</artifactId>
+                                               </exclude>
+                                       </excludes>
+                               </configuration>
+                       </plugin>
+                       <plugin>
+                               <groupId>io.fabric8</groupId>
+                               <artifactId>docker-maven-plugin</artifactId>
+                               <version>${docker-maven-plugin.version}</version>
+                               <configuration>
+                                       <images>
+                                               <image>
+                                                       <name>juplo/wordcount--%a:%v</name>
+                                               </image>
+                                       </images>
+                               </configuration>
+                       </plugin>
+               </plugins>
+       </build>
+
+</project>
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java
new file mode 100644 (file)
index 0000000..1f73d32
--- /dev/null
@@ -0,0 +1,16 @@
+package de.juplo.kafka.wordcount.counter;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+
+
+@SpringBootApplication
+@EnableConfigurationProperties(CounterApplicationProperties.class)
+public class CounterApplication
+{
+       public static void main(String[] args)
+       {
+               SpringApplication.run(CounterApplication.class, args);
+       }
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java
new file mode 100644 (file)
index 0000000..d670ba2
--- /dev/null
@@ -0,0 +1,20 @@
+package de.juplo.kafka.wordcount.counter;
+
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties("juplo.wordcount.counter")
+@Getter
+@Setter
+@ToString
+public class CounterApplicationProperties
+{
+  private String bootstrapServer = "localhost:9092";
+  private String applicationId = "counter";
+  private String inputTopic = "recordings";
+  private String outputTopic = "countings";
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
new file mode 100644 (file)
index 0000000..e8d7c11
--- /dev/null
@@ -0,0 +1,97 @@
+package de.juplo.kafka.wordcount.counter;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.springframework.boot.SpringApplication;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.regex.Pattern;
+
+import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+
+
+@Slf4j
+@Component
+public class CounterStreamProcessor
+{
+       final static Pattern PATTERN = Pattern.compile("\\W+");
+
+       public final KafkaStreams streams;
+
+
+       public CounterStreamProcessor(
+                       CounterApplicationProperties properties,
+                       ObjectMapper mapper,
+                       ConfigurableApplicationContext context)
+       {
+               StreamsBuilder builder = new StreamsBuilder();
+
+               KStream<String, String> source = builder.stream(properties.getInputTopic());
+               source
+                               .flatMapValues(sentence -> Arrays.asList(PATTERN.split(sentence)))
+                               .map((username, word) ->
+                               {
+                                       try
+                                       {
+                                               String key = mapper.writeValueAsString(Key.of(username, word));
+                                               return new KeyValue<>(key, word);
+                                       }
+                                       catch (JsonProcessingException e)
+                                       {
+                                               throw new RuntimeException(e);
+                                       }
+                               })
+                               .groupByKey()
+                               .count()
+                               .mapValues(value->Long.toString(value))
+                               .toStream()
+                               .to(properties.getOutputTopic());
+
+               Properties props = new Properties();
+               props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
+               props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+               props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+               props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+               props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+               streams = new KafkaStreams(builder.build(), props);
+               streams.setUncaughtExceptionHandler((Throwable e) ->
+               {
+                       log.error("Unexpected error!", e);
+                       CompletableFuture.runAsync(() ->
+                       {
+                               log.info("Stopping application...");
+                               SpringApplication.exit(context, () -> 1);
+                       });
+                       return SHUTDOWN_CLIENT;
+               });
+       }
+
+       @PostConstruct
+       public void start()
+       {
+               log.info("Starting Stream-Processor");
+               streams.start();
+       }
+
+       @PreDestroy
+       public void stop()
+       {
+               log.info("Stopping Stream-Processor");
+               streams.close();
+       }
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/Key.java b/src/main/java/de/juplo/kafka/wordcount/counter/Key.java
new file mode 100644 (file)
index 0000000..1e00dca
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka.wordcount.counter;
+
+import lombok.Value;
+
+
+@Value(staticConstructor = "of")
+public class Key
+{
+  private final String username;
+  private final String word;
+}
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
new file mode 100644 (file)
index 0000000..8b13789
--- /dev/null
@@ -0,0 +1 @@
+