From: Kai Moritz <kai@juplo.de>
Date: Sat, 11 Feb 2023 05:45:54 +0000 (+0100)
Subject: alt
X-Git-Tag: alt
X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=6536eb76056262f388580f8762cdfbbf4275d5b2;p=demos%2Fkafka%2Fwordcount

alt
---

diff --git a/pom.xml b/pom.xml
index 1d2212c..6e1cf81 100644
--- a/pom.xml
+++ b/pom.xml
@@ -29,6 +29,10 @@
 			<groupId>org.apache.kafka</groupId>
 			<artifactId>kafka-streams</artifactId>
 		</dependency>
+		<dependency>
+			<groupId>org.springframework.kafka</groupId>
+			<artifactId>spring-kafka</artifactId>
+		</dependency>
 
 		<dependency>
 			<groupId>org.springframework.boot</groupId>
@@ -46,11 +50,27 @@
 			<artifactId>lombok</artifactId>
 			<optional>true</optional>
 		</dependency>
+
 		<dependency>
 			<groupId>org.springframework.boot</groupId>
 			<artifactId>spring-boot-starter-test</artifactId>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+			<groupId>org.springframework.kafka</groupId>
+			<artifactId>spring-kafka-test</artifactId>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.awaitility</groupId>
+			<artifactId>awaitility</artifactId>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.assertj</groupId>
+			<artifactId>assertj-core</artifactId>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 
 	<build>
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
index d529541..ed54c95 100644
--- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
+++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
@@ -6,6 +6,8 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.springframework.kafka.support.serializer.JsonSerde;
 
 import java.util.Properties;
 
@@ -19,28 +21,27 @@ public class CounterStreamProcessor
 	public CounterStreamProcessor(
 			String inputTopic,
 			String outputTopic,
-			Properties properties)
+			Properties properties,
+			ObjectMapper mapper)
 	{
 		StreamsBuilder builder = new StreamsBuilder();
 
-		KStream<String, String> source = builder.stream(properties.getInputTopic());
+		KStream<String, Word> source = builder.stream(inputTopic);
 		source
-				.map((username, word) ->
-				{
-					try
-					{
-						String key = mapper.writeValueAsString(Key.of(username, word));
-						return new KeyValue<>(key, word);
-					}
-					catch (JsonProcessingException e)
-					{
-						throw new RuntimeException(e);
-					}
-				})
+				.map((key, word) -> new KeyValue<>(Key.of(word.getUser(), word.getWord()), word))
 				.groupByKey()
 				.count()
-				.mapValues(value->Long.toString(value))
 				.toStream()
+				.map((key, count) -> new KeyValue<>(key, WordCount.of(key.getUser(), key.getWord(), count)))
+				.to(
+						outputTopic,
+						Produced.with(
+								new JsonSerde<>(Key.class)
+										.forKeys()
+										.noTypeInfo(),
+								new JsonSerde<>(WordCount.class)
+										.noTypeInfo()));
+
 		streams = new KafkaStreams(builder.build(), properties);
 	}
 
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/Key.java b/src/main/java/de/juplo/kafka/wordcount/counter/Key.java
index 1e00dca..137fcb2 100644
--- a/src/main/java/de/juplo/kafka/wordcount/counter/Key.java
+++ b/src/main/java/de/juplo/kafka/wordcount/counter/Key.java
@@ -6,6 +6,6 @@ import lombok.Value;
 @Value(staticConstructor = "of")
 public class Key
 {
-  private final String username;
+  private final String user;
   private final String word;
 }
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/Word.java b/src/main/java/de/juplo/kafka/wordcount/counter/Word.java
new file mode 100644
index 0000000..77287d5
--- /dev/null
+++ b/src/main/java/de/juplo/kafka/wordcount/counter/Word.java
@@ -0,0 +1,13 @@
+package de.juplo.kafka.wordcount.counter;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Word
+{
+  private String user;
+  private String word;
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/WordCount.java b/src/main/java/de/juplo/kafka/wordcount/counter/WordCount.java
new file mode 100644
index 0000000..44ccb2d
--- /dev/null
+++ b/src/main/java/de/juplo/kafka/wordcount/counter/WordCount.java
@@ -0,0 +1,12 @@
+package de.juplo.kafka.wordcount.counter;
+
+import lombok.Value;
+
+
+@Value(staticConstructor = "of")
+public class WordCount
+{
+  String user;
+  String word;
+  long count;
+}
diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..171bf63
--- /dev/null
+++ b/src/test/resources/logback-test.xml
@@ -0,0 +1,5 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+    <include resource="org/springframework/boot/logging/logback/base.xml" />
+    <logger name="de.juplo.kafka.wordcount.counter" level="DEBUG" />
+</configuration>