alt alt
authorKai Moritz <kai@juplo.de>
Sat, 11 Feb 2023 05:45:54 +0000 (06:45 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 11 Feb 2023 05:45:54 +0000 (06:45 +0100)
pom.xml
src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
src/main/java/de/juplo/kafka/wordcount/counter/Key.java
src/main/java/de/juplo/kafka/wordcount/counter/Word.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/counter/WordCount.java [new file with mode: 0644]
src/test/resources/logback-test.xml [new file with mode: 0644]

diff --git a/pom.xml b/pom.xml
index 1d2212c..6e1cf81 100644 (file)
--- a/pom.xml
+++ b/pom.xml
                        <groupId>org.apache.kafka</groupId>
                        <artifactId>kafka-streams</artifactId>
                </dependency>
+               <dependency>
+                       <groupId>org.springframework.kafka</groupId>
+                       <artifactId>spring-kafka</artifactId>
+               </dependency>
 
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>lombok</artifactId>
                        <optional>true</optional>
                </dependency>
+
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-test</artifactId>
                        <scope>test</scope>
                </dependency>
+               <dependency>
+                       <groupId>org.springframework.kafka</groupId>
+                       <artifactId>spring-kafka-test</artifactId>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.awaitility</groupId>
+                       <artifactId>awaitility</artifactId>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.assertj</groupId>
+                       <artifactId>assertj-core</artifactId>
+                       <scope>test</scope>
+               </dependency>
        </dependencies>
 
        <build>
index d529541..ed54c95 100644 (file)
@@ -6,6 +6,8 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.springframework.kafka.support.serializer.JsonSerde;
 
 import java.util.Properties;
 
@@ -19,28 +21,27 @@ public class CounterStreamProcessor
        public CounterStreamProcessor(
                        String inputTopic,
                        String outputTopic,
-                       Properties properties)
+                       Properties properties,
+                       ObjectMapper mapper)
        {
                StreamsBuilder builder = new StreamsBuilder();
 
-               KStream<String, String> source = builder.stream(properties.getInputTopic());
+               KStream<String, Word> source = builder.stream(inputTopic);
                source
-                               .map((username, word) ->
-                               {
-                                       try
-                                       {
-                                               String key = mapper.writeValueAsString(Key.of(username, word));
-                                               return new KeyValue<>(key, word);
-                                       }
-                                       catch (JsonProcessingException e)
-                                       {
-                                               throw new RuntimeException(e);
-                                       }
-                               })
+                               .map((key, word) -> new KeyValue<>(Key.of(word.getUser(), word.getWord()), word))
                                .groupByKey()
                                .count()
-                               .mapValues(value->Long.toString(value))
                                .toStream()
+                               .map((key, count) -> new KeyValue<>(key, WordCount.of(key.getUser(), key.getWord(), count)))
+                               .to(
+                                               outputTopic,
+                                               Produced.with(
+                                                               new JsonSerde<>(Key.class)
+                                                                               .forKeys()
+                                                                               .noTypeInfo(),
+                                                               new JsonSerde<>(WordCount.class)
+                                                                               .noTypeInfo()));
+
                streams = new KafkaStreams(builder.build(), properties);
        }
 
index 1e00dca..137fcb2 100644 (file)
@@ -6,6 +6,6 @@ import lombok.Value;
 @Value(staticConstructor = "of")
 public class Key
 {
-  private final String username;
+  private final String user;
   private final String word;
 }
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/Word.java b/src/main/java/de/juplo/kafka/wordcount/counter/Word.java
new file mode 100644 (file)
index 0000000..77287d5
--- /dev/null
@@ -0,0 +1,13 @@
+package de.juplo.kafka.wordcount.counter;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Word
+{
+  private String user;
+  private String word;
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/WordCount.java b/src/main/java/de/juplo/kafka/wordcount/counter/WordCount.java
new file mode 100644 (file)
index 0000000..44ccb2d
--- /dev/null
@@ -0,0 +1,12 @@
+package de.juplo.kafka.wordcount.counter;
+
+import lombok.Value;
+
+
+@Value(staticConstructor = "of")
+public class WordCount
+{
+  String user;
+  String word;
+  long count;
+}
diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml
new file mode 100644 (file)
index 0000000..171bf63
--- /dev/null
@@ -0,0 +1,5 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+    <include resource="org/springframework/boot/logging/logback/base.xml" />
+    <logger name="de.juplo.kafka.wordcount.counter" level="DEBUG" />
+</configuration>