counter: 1.4.0 - Refined output JSON to match the new general stats-format counter-1.4.0
authorKai Moritz <kai@juplo.de>
Sat, 22 Jun 2024 12:22:35 +0000 (14:22 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 22 Jun 2024 15:33:20 +0000 (17:33 +0200)
* Changed the type-mapping for `Word` from `word` to `key`.
* Refined the class `Word`, that defines the JSON for the output key.
** Added attribute `type` with fixed value `POPULAR`.
** Renamed attribute `user` to `channel`.
** Renamed attribute `word` to `key`.
* Refined the class `WordCounter`, that defines the JSON for the output
  value.
** Renamed attribute `word` to `key`.
* Adapted test-classes and -cases accordingly.

pom.xml
src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
src/main/java/de/juplo/kafka/wordcount/counter/UserWord.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/counter/Word.java
src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java
src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
src/test/java/de/juplo/kafka/wordcount/counter/TestData.java
src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java
src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java

diff --git a/pom.xml b/pom.xml
index 03a7b40..722c663 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
        </parent>
        <groupId>de.juplo.kafka.wordcount</groupId>
        <artifactId>counter</artifactId>
-       <version>1.3.1</version>
+       <version>1.4.0</version>
        <name>Wordcount-Counter</name>
        <description>Word-counting stream-processor of the multi-user wordcount-example</description>
        <properties>
index 2304e55..455d895 100644 (file)
@@ -19,6 +19,7 @@ import java.util.stream.Collectors;
 @Slf4j
 public class CounterStreamProcessor
 {
+       public static final String TYPE = "COUNTER";
        public static final String STORE_NAME = "counter";
 
 
@@ -48,6 +49,7 @@ public class CounterStreamProcessor
 
                builder
                                .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
+                               .mapValues(word -> Word.of(word.getUser(), word.getWord()))
                                .map((key, word) -> new KeyValue<>(word, word))
                                .groupByKey()
                                .count(
@@ -88,9 +90,9 @@ public class CounterStreamProcessor
                return new JsonSerde<>(User.class);
        }
 
-       public static JsonSerde<Word> inValueSerde()
+       public static JsonSerde<UserWord> inValueSerde()
        {
-               return new JsonSerde<>(Word.class);
+               return new JsonSerde<>(UserWord.class);
        }
 
        public static JsonSerde<Word> outKeySerde()
@@ -117,11 +119,11 @@ public class CounterStreamProcessor
                return typeMappingsConfig(Word.class, WordCounter.class);
        }
 
-       public static String typeMappingsConfig(Class wordClass, Class wordCounterClass)
+       public static String typeMappingsConfig(Class keyClass, Class counterClass)
        {
                return Map.of(
-                                               "word", wordClass,
-                                               "counter", wordCounterClass)
+                                               "key", keyClass,
+                                               "counter", counterClass)
                                .entrySet()
                                .stream()
                                .map(entry -> entry.getKey() + ":" + entry.getValue().getName())
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/UserWord.java b/src/main/java/de/juplo/kafka/wordcount/counter/UserWord.java
new file mode 100644 (file)
index 0000000..db1ccb2
--- /dev/null
@@ -0,0 +1,13 @@
+package de.juplo.kafka.wordcount.counter;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class UserWord
+{
+  private String user;
+  private String word;
+}
index 77287d5..a058ff8 100644 (file)
@@ -1,13 +1,16 @@
 package de.juplo.kafka.wordcount.counter;
 
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 
 @Data
-@JsonIgnoreProperties(ignoreUnknown = true)
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
 public class Word
 {
-  private String user;
-  private String word;
+  private final String type = CounterStreamProcessor.TYPE;
+  private String channel;
+  private String key;
 }
index f1fce71..211fa4c 100644 (file)
@@ -12,11 +12,11 @@ import lombok.NoArgsConstructor;
 public class WordCounter
 {
   String user;
-  String word;
+  String key;
   long counter;
 
   public static WordCounter of(Word word, long counter)
   {
-    return new WordCounter(word.getUser(), word.getWord(), counter);
+    return new WordCounter(word.getChannel(), word.getKey(), counter);
   }
 }
index 0faa2de..ab395fd 100644 (file)
@@ -41,7 +41,7 @@ import static org.awaitility.Awaitility.await;
                                "spring.kafka.consumer.auto-offset-reset=earliest",
                                "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
                                "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
-                               "spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter",
+                               "spring.kafka.consumer.properties.spring.json.type.mapping=key:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter",
                                "logging.level.root=WARN",
                                "logging.level.de.juplo=DEBUG",
                                "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
index 862eb2b..9b38dbc 100644 (file)
@@ -11,6 +11,7 @@ import org.springframework.util.MultiValueMap;
 
 import java.util.stream.Stream;
 
+import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.TYPE;
 import static org.assertj.core.api.Assertions.assertThat;
 
 
@@ -25,11 +26,11 @@ class TestData
        static final String WORD_S = "s";
        static final String WORD_BOÄH = "Boäh";
 
-       static final TestOutputWord PETER_HALLO = TestOutputWord.of(PETER, WORD_HALLO);
-       static final TestOutputWord PETER_WELT = TestOutputWord.of(PETER, WORD_WELT);
-       static final TestOutputWord PETER_BOÄH = TestOutputWord.of(PETER, WORD_BOÄH);
-       static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH);
-       static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S);
+       static final TestOutputWord PETER_HALLO = TestOutputWord.of(TYPE, PETER, WORD_HALLO);
+       static final TestOutputWord PETER_WELT = TestOutputWord.of(TYPE, PETER, WORD_WELT);
+       static final TestOutputWord PETER_BOÄH = TestOutputWord.of(TYPE, PETER, WORD_BOÄH);
+       static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(TYPE, KLAUS, WORD_MÜSCH);
+       static final TestOutputWord KLAUS_S = TestOutputWord.of(TYPE, KLAUS, WORD_S);
 
        private static final KeyValue<TestInputUser, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
        {
@@ -108,12 +109,9 @@ class TestData
 
        private static Word wordOf(TestOutputWord testOutputWord)
        {
-               Word word = new Word();
-
-               word.setUser(testOutputWord.getUser());
-               word.setWord(testOutputWord.getWord());
-
-               return word;
+               return Word.of(
+                               testOutputWord.getChannel(),
+                               testOutputWord.getKey());
        }
 
        static void assertExpectedLastMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
@@ -130,8 +128,8 @@ class TestData
                        Long counter)
        {
                TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of(
-                               word.getUser(),
-                               word.getWord(),
+                               word.getChannel(),
+                               word.getKey(),
                                counter);
                assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter);
        }
index cfc2cae..132f6ba 100644 (file)
@@ -10,6 +10,7 @@ import lombok.NoArgsConstructor;
 @AllArgsConstructor(staticName = "of")
 public class TestOutputWord
 {
-  String user;
-  String word;
+  String type;
+  String channel;
+  String key;
 }
index 1b59387..a5f5d43 100644 (file)
@@ -11,6 +11,6 @@ import lombok.NoArgsConstructor;
 public class TestOutputWordCounter
 {
   String user;
-  String word;
+  String key;
   long counter;
 }