--- /dev/null
+*
+!target/*.jar
--- /dev/null
+target/
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
--- /dev/null
+FROM openjdk:11-jre-slim
+COPY target/*.jar /opt/app.jar
+EXPOSE 8080
+ENTRYPOINT ["java", "-jar", "/opt/app.jar"]
+CMD []
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-parent</artifactId>
+ <version>2.5.4</version>
+ <relativePath/> <!-- lookup parent from repository -->
+ </parent>
+ <groupId>de.juplo.kafka.wordcount</groupId>
+ <artifactId>counter</artifactId>
+ <version>1.0.0</version>
+ <name>Wordcount-Counter</name>
+ <description>Word-counting stream-processor of the multi-user wordcount-example</description>
+ <properties>
+ <docker-maven-plugin.version>0.33.0</docker-maven-plugin.version>
+ <java.version>11</java.version>
+ <kafka.version>2.8.0</kafka.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-actuator</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-streams</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-devtools</artifactId>
+ <scope>runtime</scope>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-configuration-processor</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <version>${docker-maven-plugin.version}</version>
+ <configuration>
+ <images>
+ <image>
+ <name>juplo/wordcount--%a:%v</name>
+ </image>
+ </images>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+
+
+@SpringBootApplication
+@EnableConfigurationProperties(CounterApplicationProperties.class)
+public class CounterApplication
+{
+ public static void main(String[] args)
+ {
+ SpringApplication.run(CounterApplication.class, args);
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties("juplo.wordcount.counter")
+@Getter
+@Setter
+@ToString
+public class CounterApplicationProperties
+{
+ private String bootstrapServer = "localhost:9092";
+ private String applicationId = "counter";
+ private String inputTopic = "recordings";
+ private String outputTopic = "countings";
+}
--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.springframework.boot.SpringApplication;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.regex.Pattern;
+
+import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+
+
+@Slf4j
+@Component
+public class CounterStreamProcessor
+{
+ final static Pattern PATTERN = Pattern.compile("\\W+");
+
+ public final KafkaStreams streams;
+
+
+ public CounterStreamProcessor(
+ CounterApplicationProperties properties,
+ ObjectMapper mapper,
+ ConfigurableApplicationContext context)
+ {
+ StreamsBuilder builder = new StreamsBuilder();
+
+ KStream<String, String> source = builder.stream(properties.getInputTopic());
+ source
+ .flatMapValues(sentence -> Arrays.asList(PATTERN.split(sentence)))
+ .map((username, word) ->
+ {
+ try
+ {
+ String key = mapper.writeValueAsString(Key.of(username, word));
+ return new KeyValue<>(key, word);
+ }
+ catch (JsonProcessingException e)
+ {
+ throw new RuntimeException(e);
+ }
+ })
+ .groupByKey()
+ .count()
+ .mapValues(value->Long.toString(value))
+ .toStream()
+ .to(properties.getOutputTopic());
+
+ Properties props = new Properties();
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+ props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ streams = new KafkaStreams(builder.build(), props);
+ streams.setUncaughtExceptionHandler((Throwable e) ->
+ {
+ log.error("Unexpected error!", e);
+ CompletableFuture.runAsync(() ->
+ {
+ log.info("Stopping application...");
+ SpringApplication.exit(context, () -> 1);
+ });
+ return SHUTDOWN_CLIENT;
+ });
+ }
+
+ @PostConstruct
+ public void start()
+ {
+ log.info("Starting Stream-Processor");
+ streams.start();
+ }
+
+ @PreDestroy
+ public void stop()
+ {
+ log.info("Stopping Stream-Processor");
+ streams.close();
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import lombok.Value;
+
+
+@Value(staticConstructor = "of")
+public class Key
+{
+ private final String username;
+ private final String word;
+}