top10: 1.2.0 - Switched the message-key from `String` to `User` top10-1.2.0
authorKai Moritz <kai@juplo.de>
Tue, 28 May 2024 20:59:38 +0000 (22:59 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 30 May 2024 10:12:39 +0000 (12:12 +0200)
pom.xml
src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java
src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
src/main/java/de/juplo/kafka/wordcount/top10/User.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/top10/TestData.java
src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java

diff --git a/pom.xml b/pom.xml
index a8abbc8..fd71ccd 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
        </parent>
        <groupId>de.juplo.kafka.wordcount</groupId>
        <artifactId>top10</artifactId>
-       <version>1.1.3</version>
+       <version>1.2.0</version>
        <name>Wordcount-Top-10</name>
        <description>Top-10 stream-processor of the multi-user wordcount-example</description>
        <properties>
index 7749917..6f18339 100644 (file)
@@ -33,7 +33,7 @@ public class Top10ApplicationConfiguration
                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
                props.put(JsonDeserializer.TRUSTED_PACKAGES, Top10Application.class.getPackageName());
-               props.put(JsonDeserializer.KEY_DEFAULT_TYPE, String.class.getName());
+               props.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName());
                props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName());
                props.put(
                                JsonDeserializer.TYPE_MAPPINGS,
@@ -42,6 +42,7 @@ public class Top10ApplicationConfiguration
                props.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, Boolean.FALSE);
                props.put(
                                JsonSerializer.TYPE_MAPPINGS,
+                               "user:" + User.class.getName() + "," +
                                "ranking:" + Ranking.class.getName());
                props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
index a3900bf..d3846d8 100644 (file)
@@ -35,7 +35,7 @@ public class Top10StreamProcessor
 
                builder
                                .<Key, Entry>stream(inputTopic)
-                               .map((key, entry) -> new KeyValue<>(key.getUser(), entry))
+                               .map((key, entry) -> new KeyValue<>(User.of(key.getUser()), entry))
                                .groupByKey()
                                .aggregate(
                                                () -> new Ranking(),
diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/User.java b/src/main/java/de/juplo/kafka/wordcount/top10/User.java
new file mode 100644 (file)
index 0000000..53c258d
--- /dev/null
@@ -0,0 +1,14 @@
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@AllArgsConstructor(staticName = "of")
+@NoArgsConstructor
+@Data
+public class User
+{
+  String user;
+}
index 73a405e..3bb6b54 100644 (file)
@@ -55,7 +55,7 @@ class TestData
                                        TestCounter.of("klaus","s",3)),
        };
 
-       static void assertExpectedMessages(MultiValueMap<String, Ranking> receivedMessages)
+       static void assertExpectedMessages(MultiValueMap<User, Ranking> receivedMessages)
        {
                expectedMessages().forEach(
                                (user, rankings) ->
@@ -63,69 +63,69 @@ class TestData
                                                                .containsExactlyElementsOf(rankings));
        }
 
-       static KeyValue<String, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
+       static KeyValue<User, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
        {
                        KeyValue.pair( // 0
-                                       "peter",
+                                       User.of("peter"),
                                        Ranking.of(
                                                        Entry.of("Hallo", 1l))),
                        KeyValue.pair( // 1
-                                       "klaus",
+                                       User.of("klaus"),
                                        Ranking.of(
                                                        Entry.of("Müsch", 1l))),
                        KeyValue.pair( // 2
-                                       "peter",
+                                       User.of("peter"),
                                        Ranking.of(
                                                        Entry.of("Hallo", 1l),
                                                        Entry.of("Welt", 1l))),
                        KeyValue.pair( // 3
-                                       "klaus",
+                                       User.of("klaus"),
                                        Ranking.of(
                                                        Entry.of("Müsch", 2l))),
                        KeyValue.pair( // 4
-                                       "klaus",
+                                       User.of("klaus"),
                                        Ranking.of(
                                                        Entry.of("Müsch", 2l),
                                                        Entry.of("s", 1l))),
                        KeyValue.pair( // 5
-                                       "peter",
+                                       User.of("peter"),
                                        Ranking.of(
                                                        Entry.of("Hallo", 1l),
                                                        Entry.of("Welt", 1l),
                                                        Entry.of("Boäh", 1l))),
                        KeyValue.pair( // 6
-                                       "peter",
+                                       User.of("peter"),
                                        Ranking.of(
                                                        Entry.of("Welt", 2l),
                                                        Entry.of("Hallo", 1l),
                                                        Entry.of("Boäh", 1l))),
                        KeyValue.pair( // 7
-                                       "peter",
+                                       User.of("peter"),
                                        Ranking.of(
                                                        Entry.of("Welt", 2l),
                                                        Entry.of("Boäh", 2l),
                                                        Entry.of("Hallo", 1l))),
                        KeyValue.pair( // 8
-                                       "klaus",
+                                       User.of("klaus"),
                                        Ranking.of(
                                                        Entry.of("Müsch", 2l),
                                                        Entry.of("s", 2l))),
                        KeyValue.pair( // 9
-                                       "peter",
+                                       User.of("peter"),
                                        Ranking.of(
                                                        Entry.of("Boäh", 3l),
                                                        Entry.of("Welt", 2l),
                                                        Entry.of("Hallo", 1l))),
                        KeyValue.pair( // 10
-                                       "klaus",
+                                       User.of("klaus"),
                                        Ranking.of(
                                                        Entry.of("s", 3l),
                                                        Entry.of("Müsch", 2l))),
        };
 
-       static MultiValueMap<String, Ranking> expectedMessages()
+       static MultiValueMap<User, Ranking> expectedMessages()
        {
-               MultiValueMap<String, Ranking> expectedMessages = new LinkedMultiValueMap<>();
+               MultiValueMap<User, Ranking> expectedMessages = new LinkedMultiValueMap<>();
                Stream
                                .of(EXPECTED_MESSAGES)
                                .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
index 86314e5..01c1cf6 100644 (file)
@@ -33,7 +33,7 @@ public class Top10StreamProcessorTopologyTest
 
   TopologyTestDriver testDriver;
   TestInputTopic<Key, Entry> in;
-  TestOutputTopic<String, Ranking> out;
+  TestOutputTopic<User, Ranking> out;
 
 
   @BeforeEach
@@ -61,7 +61,7 @@ public class Top10StreamProcessorTopologyTest
 
     out = testDriver.createOutputTopic(
         OUT,
-        (JsonDeserializer<String>)keySerde.deserializer(),
+        (JsonDeserializer<User>)keySerde.deserializer(),
         (JsonDeserializer<Ranking>)valueSerde.deserializer());
 
   }
@@ -76,7 +76,7 @@ public class Top10StreamProcessorTopologyTest
             Key.of(kv.key.getUser(), kv.key.getWord()),
             Entry.of(kv.value.getWord(), kv.value.getCounter())));
 
-    MultiValueMap<String, Ranking> receivedMessages = new LinkedMultiValueMap<>();
+    MultiValueMap<User, Ranking> receivedMessages = new LinkedMultiValueMap<>();
     out
         .readRecordsToList()
         .forEach(record ->