top10: 1.4.0 - Refined output JSON -- ALIGN top10-1.4.0
authorKai Moritz <kai@juplo.de>
Sat, 22 Jun 2024 15:07:15 +0000 (17:07 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 22 Jun 2024 16:55:26 +0000 (18:55 +0200)
pom.xml
src/main/java/de/juplo/kafka/wordcount/top10/Stats.java
src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java
src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
src/test/java/de/juplo/kafka/wordcount/query/TestStats.java
src/test/java/de/juplo/kafka/wordcount/top10/TestData.java
src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java
src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java

diff --git a/pom.xml b/pom.xml
index e5cd268..0dce2d1 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
        </parent>
        <groupId>de.juplo.kafka.wordcount</groupId>
        <artifactId>top10</artifactId>
-       <version>1.3.0</version>
+       <version>1.4.0</version>
        <name>Wordcount-Top-10</name>
        <description>Top-10 stream-processor of the multi-user wordcount-example</description>
        <properties>
index 53c258d..05c2a91 100644 (file)
@@ -8,7 +8,8 @@ import lombok.NoArgsConstructor;
 @AllArgsConstructor(staticName = "of")
 @NoArgsConstructor
 @Data
-public class User
+public class Stats
 {
-  String user;
+  StatsType type;
+  String channel;
 }
index 57e5a47..aecd260 100644 (file)
@@ -51,13 +51,13 @@ public class Top10ApplicationConfiguration
 
                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
-               props.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName());
+               props.put(JsonDeserializer.KEY_DEFAULT_TYPE, Stats.class.getName());
                props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName());
                props.put(
                                JsonDeserializer.TYPE_MAPPINGS,
                                "key:" + Key.class.getName() + "," +
                                "counter:" + Entry.class.getName() + "," +
-                               "user:" + User.class.getName() + "," +
+                               "stats:" + Stats.class.getName() + "," +
                                "ranking:" + Ranking.class.getName());
 
                return props;
index 907c7ff..1235132 100644 (file)
@@ -41,11 +41,13 @@ public class Top10StreamProcessor
 
                builder
                                .<Key, Entry>stream(inputTopic)
-                               .map((key, entry) -> new KeyValue<>(User.of(key.getChannel()), entry))
+                               .map((key, entry) -> new KeyValue<>(
+                                               Stats.of(key.getType(), key.getChannel()),
+                                               entry))
                                .groupByKey()
                                .aggregate(
                                                () -> new Ranking(),
-                                               (user, entry, ranking) -> ranking.add(entry),
+                                               (stats, entry, ranking) -> ranking.add(entry),
                                                Materialized.as(storeSupplier))
                                .toStream()
                                .to(outputTopic);
@@ -56,7 +58,7 @@ public class Top10StreamProcessor
                return topology;
        }
 
-       ReadOnlyKeyValueStore<User, Ranking> getStore()
+       ReadOnlyKeyValueStore<Stats, Ranking> getStore()
        {
                return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
        }
index 53a5992..6a47193 100644 (file)
@@ -8,7 +8,8 @@ import lombok.NoArgsConstructor;
 @AllArgsConstructor(staticName = "of")
 @NoArgsConstructor
 @Data
-public class TestUser
+public class TestStats
 {
-  String user;
+  String type;
+  String channel;
 }
index 4fb229b..069f49a 100644 (file)
@@ -4,7 +4,7 @@ import de.juplo.kafka.wordcount.counter.TestCounter;
 import de.juplo.kafka.wordcount.counter.TestWord;
 import de.juplo.kafka.wordcount.query.TestEntry;
 import de.juplo.kafka.wordcount.query.TestRanking;
-import de.juplo.kafka.wordcount.query.TestUser;
+import de.juplo.kafka.wordcount.query.TestStats;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.springframework.util.LinkedMultiValueMap;
@@ -20,8 +20,8 @@ class TestData
 {
        static final String TYPE_COUNTER = "COUNTER";
 
-       static final TestUser PETER = TestUser.of("peter");
-       static final TestUser KLAUS = TestUser.of("klaus");
+       static final TestStats PETER = TestStats.of(StatsType.COUNTER.name(), "peter");
+       static final TestStats KLAUS = TestStats.of(StatsType.COUNTER.name(), "klaus");
 
        static final Stream<KeyValue<TestWord, TestCounter>> getInputMessages()
        {
@@ -31,83 +31,85 @@ class TestData
        private static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
        {
                        new KeyValue<>(
-                                       TestWord.of(TYPE_COUNTER, PETER.getUser(),"Hallo"),
+                                       TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Hallo"),
                                        TestCounter.of("Hallo",1)),
                        new KeyValue<>(
-                                       TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"Müsch"),
+                                       TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"Müsch"),
                                        TestCounter.of("Müsch",1)),
                        new KeyValue<>(
-                                       TestWord.of(TYPE_COUNTER, PETER.getUser(),"Welt"),
+                                       TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Welt"),
                                        TestCounter.of("Welt",1)),
                        new KeyValue<>(
-                                       TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"Müsch"),
+                                       TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"Müsch"),
                                        TestCounter.of("Müsch",2)),
                        new KeyValue<>(
-                                       TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"s"),
+                                       TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"s"),
                                        TestCounter.of("s",1)),
                        new KeyValue<>(
-                                       TestWord.of(TYPE_COUNTER, PETER.getUser(),"Boäh"),
+                                       TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Boäh"),
                                        TestCounter.of("Boäh",1)),
                        new KeyValue<>(
-                                       TestWord.of(TYPE_COUNTER, PETER.getUser(),"Welt"),
+                                       TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Welt"),
                                        TestCounter.of("Welt",2)),
                        new KeyValue<>(
-                                       TestWord.of(TYPE_COUNTER, PETER.getUser(),"Boäh"),
+                                       TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Boäh"),
                                        TestCounter.of("Boäh",2)),
                        new KeyValue<>(
-                                       TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"s"),
+                                       TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"s"),
                                        TestCounter.of("s",2)),
                        new KeyValue<>(
-                                       TestWord.of(TYPE_COUNTER, PETER.getUser(),"Boäh"),
+                                       TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Boäh"),
                                        TestCounter.of("Boäh",3)),
                        new KeyValue<>(
-                                       TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"s"),
+                                       TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"s"),
                                        TestCounter.of("s",3)),
        };
 
-       static void assertExpectedMessages(MultiValueMap<TestUser, TestRanking> receivedMessages)
+       static void assertExpectedMessages(MultiValueMap<TestStats, TestRanking> receivedMessages)
        {
                expectedMessages().forEach(
-                               (user, rankings) ->
-                                               assertThat(receivedMessages.get(user))
+                               (stats, rankings) ->
+                                               assertThat(receivedMessages.get(stats))
                                                                .containsExactlyElementsOf(rankings));
        }
 
-       static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
+       static void assertExpectedState(ReadOnlyKeyValueStore<Stats, Ranking> store)
        {
-               assertRankingEqualsRankingFromLastMessage(PETER, store.get(userOf(PETER)));
-               assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(userOf(KLAUS)));
+               assertRankingEqualsRankingFromLastMessage(PETER, store.get(statsOf(PETER)));
+               assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(statsOf(KLAUS)));
        }
 
-       private static User userOf(TestUser user)
+       private static Stats statsOf(TestStats stats)
        {
-               return User.of(user.getUser());
+               return Stats.of(
+                               StatsType.valueOf(stats.getType()),
+                               stats.getChannel());
        }
 
-       static void assertExpectedNumberOfMessagesForUsers(MultiValueMap<TestUser, TestRanking> receivedMessages)
+       static void assertExpectedNumberOfMessages(MultiValueMap<TestStats, TestRanking> receivedMessages)
        {
-               assertThat(countMessagesForUser(PETER, receivedMessages));
-               assertThat(countMessagesForUser(KLAUS, receivedMessages));
+               assertThat(countMessages(PETER, receivedMessages));
+               assertThat(countMessages(KLAUS, receivedMessages));
        }
 
-       private static int countMessagesForUser(TestUser user, MultiValueMap<TestUser, TestRanking> messagesForUsers)
+       private static int countMessages(TestStats stats, MultiValueMap<TestStats, TestRanking> messagesFor)
        {
-               return messagesForUsers.get(user) == null
+               return messagesFor.get(stats) == null
                                ? 0
-                               : messagesForUsers.get(user).size();
+                               : messagesFor.get(stats).size();
        }
 
 
-       static void assertExpectedLastMessagesForUsers(MultiValueMap<TestUser, TestRanking> receivedMessages)
+       static void assertExpectedLastMessages(MultiValueMap<TestStats, TestRanking> receivedMessages)
        {
                assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages));
                assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages));
        }
 
-       private static void assertRankingEqualsRankingFromLastMessage(TestUser user, Ranking ranking)
+       private static void assertRankingEqualsRankingFromLastMessage(TestStats stats, Ranking ranking)
        {
                TestRanking testRanking = TestRanking.of(testEntriesOf(ranking.getEntries()));
-               assertRankingEqualsRankingFromLastMessage(user, testRanking);
+               assertRankingEqualsRankingFromLastMessage(stats, testRanking);
        }
 
        private static TestEntry[] testEntriesOf(Entry... entries)
@@ -122,25 +124,25 @@ class TestData
                                .toArray(size -> new TestEntry[size]);
        }
 
-       private static void assertRankingEqualsRankingFromLastMessage(TestUser user, TestRanking ranking)
+       private static void assertRankingEqualsRankingFromLastMessage(TestStats stats, TestRanking ranking)
        {
-               assertThat(ranking).isEqualTo(getLastMessageFor(user));
+               assertThat(ranking).isEqualTo(getLastMessageFor(stats));
        }
 
-       private static TestRanking getLastMessageFor(TestUser user)
+       private static TestRanking getLastMessageFor(TestStats stats)
        {
-               return getLastMessageFor(user, expectedMessages());
+               return getLastMessageFor(stats, expectedMessages());
        }
 
-       private static TestRanking getLastMessageFor(TestUser user, MultiValueMap<TestUser, TestRanking> messagesForUsers)
+       private static TestRanking getLastMessageFor(TestStats stats, MultiValueMap<TestStats, TestRanking> messagesFor)
        {
-               return messagesForUsers
-                               .get(user)
+               return messagesFor
+                               .get(stats)
                                .stream()
                                .reduce(null, (left, right) -> right);
        }
 
-       private static KeyValue<TestUser, TestRanking>[] EXPECTED_MESSAGES = new KeyValue[]
+       private static KeyValue<TestStats, TestRanking>[] EXPECTED_MESSAGES = new KeyValue[]
        {
                        KeyValue.pair( // 0
                                        PETER,
@@ -200,9 +202,9 @@ class TestData
                                                        TestEntry.of("Müsch", 2l))),
        };
 
-       private static MultiValueMap<TestUser, TestRanking> expectedMessages()
+       private static MultiValueMap<TestStats, TestRanking> expectedMessages()
        {
-               MultiValueMap<TestUser, TestRanking> expectedMessages = new LinkedMultiValueMap<>();
+               MultiValueMap<TestStats, TestRanking> expectedMessages = new LinkedMultiValueMap<>();
                Stream
                                .of(EXPECTED_MESSAGES)
                                .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
index 5f0e817..51e424e 100644 (file)
@@ -3,7 +3,7 @@ package de.juplo.kafka.wordcount.top10;
 import de.juplo.kafka.wordcount.counter.TestCounter;
 import de.juplo.kafka.wordcount.counter.TestWord;
 import de.juplo.kafka.wordcount.query.TestRanking;
-import de.juplo.kafka.wordcount.query.TestUser;
+import de.juplo.kafka.wordcount.query.TestStats;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
@@ -40,7 +40,7 @@ import static org.awaitility.Awaitility.await;
                                "spring.kafka.consumer.auto-offset-reset=earliest",
                                "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
                                "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
-                               "spring.kafka.consumer.properties.spring.json.type.mapping=user:de.juplo.kafka.wordcount.query.TestUser,ranking:de.juplo.kafka.wordcount.query.TestRanking",
+                               "spring.kafka.consumer.properties.spring.json.type.mapping=stats:de.juplo.kafka.wordcount.query.TestStats,ranking:de.juplo.kafka.wordcount.query.TestRanking",
                                "logging.level.root=WARN",
                                "logging.level.de.juplo=DEBUG",
                                "logging.level.org.apache.kafka.clients=INFO",
@@ -110,32 +110,32 @@ public class Top10ApplicationIT
 
        @DisplayName("Await the expected number of messages")
        @Test
-       public void testAwaitExpectedNumberOfMessagesForUsers()
+       public void testAwaitExpectedNumberOfMessages()
        {
                await("Expected number of messages")
                                .atMost(Duration.ofSeconds(5))
                                .untilAsserted(() -> consumer.enforceAssertion(
-                                               receivedMessages -> TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages)));
+                                               receivedMessages -> TestData.assertExpectedNumberOfMessages(receivedMessages)));
        }
 
        @DisplayName("Await the expected final output messages")
        @Test
-       public void testAwaitExpectedLastMessagesForUsers()
+       public void testAwaitExpectedLastMessages()
        {
                await("Expected final output messages")
                                .atMost(Duration.ofSeconds(5))
                                .untilAsserted(() -> consumer.enforceAssertion(
-                                               receivedMessages -> TestData.assertExpectedLastMessagesForUsers(receivedMessages)));
+                                               receivedMessages -> TestData.assertExpectedLastMessages(receivedMessages)));
        }
 
 
        static class Consumer
        {
-               private final MultiValueMap<TestUser, TestRanking> received = new LinkedMultiValueMap<>();
+               private final MultiValueMap<TestStats, TestRanking> received = new LinkedMultiValueMap<>();
 
                @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
                public synchronized void receive(
-                               @Header(KafkaHeaders.RECEIVED_KEY) TestUser user,
+                               @Header(KafkaHeaders.RECEIVED_KEY) TestStats user,
                                @Payload TestRanking ranking)
                {
                        log.debug("Received message: {} -> {}", user, ranking);
@@ -143,7 +143,7 @@ public class Top10ApplicationIT
                }
 
                synchronized void enforceAssertion(
-                               java.util.function.Consumer<MultiValueMap<TestUser, TestRanking>> assertion)
+                               java.util.function.Consumer<MultiValueMap<TestStats, TestRanking>> assertion)
                {
                        assertion.accept(received);
                }
index a8fc859..559d742 100644 (file)
@@ -3,7 +3,7 @@ package de.juplo.kafka.wordcount.top10;
 import de.juplo.kafka.wordcount.counter.TestCounter;
 import de.juplo.kafka.wordcount.counter.TestWord;
 import de.juplo.kafka.wordcount.query.TestRanking;
-import de.juplo.kafka.wordcount.query.TestUser;
+import de.juplo.kafka.wordcount.query.TestStats;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TestOutputTopic;
@@ -33,7 +33,7 @@ public class Top10StreamProcessorTopologyTest
   public static final String OUT = "TEST-OUT";
 
   static TopologyTestDriver testDriver;
-  static MultiValueMap<TestUser, TestRanking> receivedMessages = new LinkedMultiValueMap<>();
+  static MultiValueMap<TestStats, TestRanking> receivedMessages = new LinkedMultiValueMap<>();
 
 
   @BeforeAll
@@ -51,10 +51,10 @@ public class Top10StreamProcessorTopologyTest
         jsonSerializer(TestWord.class, true),
         jsonSerializer(TestCounter.class,false));
 
-    TestOutputTopic<TestUser, TestRanking> out = testDriver.createOutputTopic(
+    TestOutputTopic<TestStats, TestRanking> out = testDriver.createOutputTopic(
         OUT,
         new JsonDeserializer()
-            .copyWithType(TestUser.class)
+            .copyWithType(TestStats.class)
             .ignoreTypeHeaders(),
         new JsonDeserializer()
             .copyWithType(TestRanking.class)
@@ -79,23 +79,23 @@ public class Top10StreamProcessorTopologyTest
 
   @DisplayName("Assert the expected number of messages")
   @Test
-  public void testExpectedNumberOfMessagesForUsers()
+  public void testExpectedNumberOfMessages()
   {
-    TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages);
+    TestData.assertExpectedNumberOfMessages(receivedMessages);
   }
 
   @DisplayName("Assert the expected final output messages")
   @Test
-  public void testExpectedLastMessagesForUSers()
+  public void testExpectedLastMessages()
   {
-    TestData.assertExpectedLastMessagesForUsers(receivedMessages);
+    TestData.assertExpectedLastMessages(receivedMessages);
   }
 
   @DisplayName("Assert the expected state in the state-store")
   @Test
   public void testExpectedState()
   {
-    KeyValueStore<User, Ranking> store = testDriver.getKeyValueStore(STORE_NAME);
+    KeyValueStore<Stats, Ranking> store = testDriver.getKeyValueStore(STORE_NAME);
     TestData.assertExpectedState(store);
   }