WIP
authorKai Moritz <kai@juplo.de>
Tue, 14 May 2024 20:50:15 +0000 (22:50 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 14 May 2024 21:14:14 +0000 (23:14 +0200)
src/main/java/de/juplo/kafka/wordcount/top10/Counter.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
src/main/java/de/juplo/kafka/wordcount/top10/Word.java
src/main/java/de/juplo/kafka/wordcount/top10/WordCount.java [deleted file]
src/test/java/de/juplo/kafka/wordcount/top10/TestData.java

diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Counter.java b/src/main/java/de/juplo/kafka/wordcount/top10/Counter.java
new file mode 100644 (file)
index 0000000..3dac384
--- /dev/null
@@ -0,0 +1,22 @@
+package de.juplo.kafka.wordcount.top10;
+
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@NoArgsConstructor
+@AllArgsConstructor(
+    staticName = "of",
+    access = AccessLevel.PACKAGE)
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Counter
+{
+  String user;
+  String word;
+  long counter;
+}
index 63357e8..b669344 100644 (file)
@@ -24,10 +24,10 @@ public class Top10StreamProcessor
                StreamsBuilder builder = new StreamsBuilder();
 
                builder
-                               .<Word, WordCount>stream(inputTopic)
-                               .map((word, wordCount) ->
+                               .<Word, Counter>stream(inputTopic)
+                               .map((word, counter) ->
                                {
-                                       Entry entry = Entry.of(word.getWord(), wordCount.getCount());
+                                       Entry entry = Entry.of(word.getWord(), counter.getCounter());
                                        return new KeyValue<>(word.getUser(), entry);
                                })
                                .groupByKey()
index e4662bc..3ee1bb0 100644 (file)
@@ -1,11 +1,13 @@
 package de.juplo.kafka.wordcount.top10;
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
 
 @NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
 @Data
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class Word
diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/WordCount.java b/src/main/java/de/juplo/kafka/wordcount/top10/WordCount.java
deleted file mode 100644 (file)
index 8530f49..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-package de.juplo.kafka.wordcount.top10;
-
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import lombok.AccessLevel;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@NoArgsConstructor
-@AllArgsConstructor(
-    staticName = "of",
-    access = AccessLevel.PACKAGE)
-@Data
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class WordCount
-{
-  String user;
-  String word;
-  long count;
-}
index dd7ad33..f521869 100644 (file)
@@ -1,51 +1,58 @@
 package de.juplo.kafka.wordcount.top10;
 
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.KeyValue;
+
 import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 
 class TestData
 {
-       static void writeInputData(BiConsumer<String, String> consumer)
+       static void writeInputData(BiConsumer<Word, Counter> consumer)
        {
                consumer.accept(
-                               "peter",
-                               "{\"user\":\"peter\",\"word\":\"Hallo\"}");
+                               Word.of("peter","Hallo"),
+                               Counter.of("peter","Hallo",1));
                consumer.accept(
-                               "klaus",
-                               "{\"user\":\"klaus\",\"word\":\"Müsch\"}");
+                               Word.of("klaus","Müsch"),
+                               Counter.of("klaus","Müsch",1));
                consumer.accept(
-                               "peter",
-                               "{\"user\":\"peter\",\"word\":\"Welt\"}");
+                               Word.of("peter","Welt"),
+                               Counter.of("peter","Welt",1));
                consumer.accept(
-                               "klaus",
-                               "{\"user\":\"klaus\",\"word\":\"Müsch\"}");
+                               Word.of("klaus","Müsch"),
+                               Counter.of("klaus","Müsch",2));
                consumer.accept(
-                               "klaus",
-                               "{\"user\":\"klaus\",\"word\":\"s\"}");
+                               Word.of("klaus","s"),
+                               Counter.of("klaus","s",1));
                consumer.accept(
-                               "peter",
-                               "{\"user\":\"peter\",\"word\":\"Boäh\"}");
+                               Word.of("peter","Boäh"),
+                               Counter.of("peter","Boäh",1));
                consumer.accept(
-                               "peter",
-                               "{\"user\":\"peter\",\"word\":\"Welt\"}");
+                               Word.of("peter","Welt"),
+                               Counter.of("peter","Welt",2));
                consumer.accept(
-                               "peter",
-                               "{\"user\":\"peter\",\"word\":\"Boäh\"}");
+                               Word.of("peter","Boäh"),
+                               Counter.of("peter","Boäh",2));
                consumer.accept(
-                               "klaus",
-                               "{\"user\":\"klaus\",\"word\":\"s\"}");
+                               Word.of("klaus","s"),
+                               Counter.of("klaus","s",2));
                consumer.accept(
-                               "peter",
-                               "{\"user\":\"peter\",\"word\":\"Boäh\"}");
+                               Word.of("peter","Boäh"),
+                               Counter.of("peter","Boäh",3));
                consumer.accept(
-                               "klaus",
-                               "{\"user\":\"klaus\",\"word\":\"s\"}");
+                               Word.of("klaus","s"),
+                               Counter.of("klaus","s",3));
        }
 
-       static void assertExpectedResult(List<Message> receivedMessages)
+       static void assertExpectedResult(List<KeyValue<Word, Counter>> receivedMessages)
        {
                assertThat(receivedMessages).hasSize(11);
                assertThat(receivedMessages).containsSubsequence(
@@ -66,40 +73,65 @@ class TestData
                                expectedMessages[9]); // Boäh
        }
 
-       static Message[] expectedMessages =
+       static KeyValue<Word, Counter>[] expectedMessages = new KeyValue[]
+                       {
+                                       KeyValue.pair(
+                                                       Word.of("peter","Hallo"),
+                                                       Counter.of("peter","Hallo",1)),
+                                       KeyValue.pair(
+                                                       Word.of("klaus","Müsch"),
+                                                       Counter.of("klaus","Müsch",1)),
+                                       KeyValue.pair(
+                                                       Word.of("peter","Welt"),
+                                                       Counter.of("peter","Welt",1)),
+                                       KeyValue.pair(
+                                                       Word.of("klaus","Müsch"),
+                                                       Counter.of("klaus","Müsch",2)),
+                                       KeyValue.pair(
+                                                       Word.of("klaus","s"),
+                                                       Counter.of("klaus","s",1)),
+                                       KeyValue.pair(
+                                                       Word.of("peter","Boäh"),
+                                                       Counter.of("peter","Boäh",1)),
+                                       KeyValue.pair(
+                                                       Word.of("peter","Welt"),
+                                                       Counter.of("peter","Welt",2)),
+                                       KeyValue.pair(
+                                                       Word.of("peter","Boäh"),
+                                                       Counter.of("peter","Boäh",2)),
+                                       KeyValue.pair(
+                                                       Word.of("klaus","s"),
+                                                       Counter.of("klaus","s",2)),
+                                       KeyValue.pair(
+                                                       Word.of("peter","Boäh"),
+                                                       Counter.of("peter","Boäh",3)),
+                                       KeyValue.pair(
+                                                       Word.of("klaus","s"),
+                                                       Counter.of("klaus","s",3)),
+                       };
+
+       static Map<String, Object> convertToMap(Properties properties)
+       {
+               return properties
+                               .entrySet()
+                               .stream()
+                               .collect(
+                                               Collectors.toMap(
+                                                               entry -> (String)entry.getKey(),
+                                                               entry -> entry.getValue()
+                                               ));
+       }
+
+       static String parseHeader(Headers headers, String key)
        {
-                       Message.of(
-                                       "{\"user\":\"peter\",\"word\":\"Hallo\"}",
-                                       "{\"user\":\"peter\",\"word\":\"Hallo\",\"count\":1}"),
-                       Message.of(
-                                       "{\"user\":\"klaus\",\"word\":\"Müsch\"}",
-                                       "{\"user\":\"klaus\",\"word\":\"Müsch\",\"count\":1}"),
-                       Message.of(
-                                       "{\"user\":\"peter\",\"word\":\"Welt\"}",
-                                       "{\"user\":\"peter\",\"word\":\"Welt\",\"count\":1}"),
-                       Message.of(
-                                       "{\"user\":\"klaus\",\"word\":\"Müsch\"}",
-                                       "{\"user\":\"klaus\",\"word\":\"Müsch\",\"count\":2}"),
-                       Message.of(
-                                       "{\"user\":\"klaus\",\"word\":\"s\"}",
-                                       "{\"user\":\"klaus\",\"word\":\"s\",\"count\":1}"),
-                       Message.of(
-                                       "{\"user\":\"peter\",\"word\":\"Boäh\"}",
-                                       "{\"user\":\"peter\",\"word\":\"Boäh\",\"count\":1}"),
-                       Message.of(
-                                       "{\"user\":\"peter\",\"word\":\"Welt\"}",
-                                       "{\"user\":\"peter\",\"word\":\"Welt\",\"count\":2}"),
-                       Message.of(
-                                       "{\"user\":\"peter\",\"word\":\"Boäh\"}",
-                                       "{\"user\":\"peter\",\"word\":\"Boäh\",\"count\":2}"),
-                       Message.of(
-                                       "{\"user\":\"klaus\",\"word\":\"s\"}",
-                                       "{\"user\":\"klaus\",\"word\":\"s\",\"count\":2}"),
-                       Message.of(
-                                       "{\"user\":\"peter\",\"word\":\"Boäh\"}",
-                                       "{\"user\":\"peter\",\"word\":\"Boäh\",\"count\":3}"),
-                       Message.of(
-                                       "{\"user\":\"klaus\",\"word\":\"s\"}",
-                                       "{\"user\":\"klaus\",\"word\":\"s\",\"count\":3}"),
-       };
+               Header header = headers.lastHeader(key);
+               if (header == null)
+               {
+                       return key + "=null";
+               }
+               else
+               {
+                       return key + "=" + new String(header.value());
+               }
+       }
 }