query: 2.1.0 - Refined input JSON (adapted to general format for stats) query-2.1.0
authorKai Moritz <kai@juplo.de>
Sat, 22 Jun 2024 17:05:45 +0000 (19:05 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 23 Jun 2024 06:53:34 +0000 (08:53 +0200)
* Adapted the app to the new type-mapping `stats` for the incomming keys.
* Refined the class `Key`, that defines the JSON for the incomming key.
** Renamed attribute `user` to `channel`.
** Added attribute `type` of type `String`.
* Refined the class `Entry`, that defines the JSON of an entry in the
  ranking, that is defined in the class `Ranking`.
** Renamed attribute `word` to `key`.
* The `QueryStreamProcessor` filters the incomming messages by the field
  `type` of the `Key`: all messages are dropped, that are not of type
  `COUNTER`.

pom.xml
src/main/java/de/juplo/kafka/wordcount/query/Entry.java
src/main/java/de/juplo/kafka/wordcount/query/Key.java
src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java
src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java
src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java
src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java
src/test/java/de/juplo/kafka/wordcount/query/TestData.java
src/test/java/de/juplo/kafka/wordcount/top10/TestEntry.java
src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java

diff --git a/pom.xml b/pom.xml
index 707d82d..ff775f7 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
        </parent>
        <groupId>de.juplo.kafka.wordcount</groupId>
        <artifactId>query</artifactId>
-       <version>2.0.1</version>
+       <version>2.1.0</version>
        <name>Wordcount-Query</name>
        <description>Query stream-processor of the multi-user wordcount-example</description>
        <properties>
index 4be314c..383b1a6 100644 (file)
@@ -6,6 +6,6 @@ import lombok.Data;
 @Data
 public class Entry
 {
-  private String word;
+  private String key;
   private Long counter;
 }
index 57d095a..a2d85a1 100644 (file)
@@ -6,5 +6,6 @@ import lombok.Data;
 @Data
 public class Key
 {
-  private String user;
+  private String type;
+  private String channel;
 }
index 440d5c4..0f9cad1 100644 (file)
@@ -89,7 +89,7 @@ public class QueryApplicationConfiguration
                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
                props.put(
                                JsonDeserializer.TYPE_MAPPINGS,
-                               "user:" + Key.class.getName() + "," +
+                               "stats:" + Key.class.getName() + "," +
                                "ranking:" + Ranking.class.getName() + "," +
                                "userranking:" + UserRanking.class.getName());
 
index e075eb7..5543a91 100644 (file)
@@ -20,6 +20,7 @@ import java.util.Properties;
 @Slf4j
 public class QueryStreamProcessor
 {
+       public static final String STATS_TYPE = "COUNTER";
        public static final String USER_STORE_NAME = "users";
        public static final String RANKING_STORE_NAME = "rankings";
 
@@ -65,7 +66,8 @@ public class QueryStreamProcessor
                                                                .withValueSerde(new JsonSerde().copyWithType(User.class)));
                KStream<String, Ranking> rankings = builder
                                .<Key, Ranking>stream(rankingInputTopic)
-                               .map((key, value) -> new KeyValue<>(key.getUser(), value));
+                               .filter((key, value) -> STATS_TYPE.equals(key.getType()))
+                               .map((key, value) -> new KeyValue<>(key.getChannel(), value));
 
                rankings
                                .join(users, (ranking, user) -> UserRanking.of(
index 1315eae..fb12aee 100644 (file)
@@ -153,7 +153,7 @@ public class QueryApplicationIT
                        Map<String, Object> properties = Map.of(
                                        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
                                        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
-                                       JsonSerializer.TYPE_MAPPINGS, "user:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName());
+                                       JsonSerializer.TYPE_MAPPINGS, "stats:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName());
                        return new KafkaTemplate(producerFactory, properties);
                }
 
index 203c813..fbeb19b 100644 (file)
@@ -83,7 +83,7 @@ public class QueryStreamProcessorTopologyTest
     jsonSerializer.configure(
         Map.of(
             JsonSerializer.TYPE_MAPPINGS,
-            "user:" + TestUser.class.getName() + "," +
+            "stats:" + TestUser.class.getName() + "," +
             "ranking:" + TestRanking.class.getName()),
         isKey);
     return jsonSerializer;
index f5b8a00..44162a0 100644 (file)
@@ -10,13 +10,14 @@ import java.util.Arrays;
 import java.util.function.Function;
 import java.util.stream.Stream;
 
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STATS_TYPE;
 import static org.assertj.core.api.Assertions.assertThat;
 
 
 class TestData
 {
-       static final TestUser PETER = TestUser.of("peter");
-       static final TestUser KLAUS = TestUser.of("klaus");
+       static final TestUser PETER = TestUser.of(STATS_TYPE, "peter");
+       static final TestUser KLAUS = TestUser.of(STATS_TYPE, "klaus");
 
        static final Stream<KeyValue<TestUser, TestRanking>> getTop10Messages()
        {
@@ -30,8 +31,8 @@ class TestData
 
        static void assertExpectedState(Function<String, UserRanking> function)
        {
-               assertRankingEqualsRankingFromLastMessage(PETER.getUser(), function.apply(PETER.getUser()));
-               assertRankingEqualsRankingFromLastMessage(KLAUS.getUser(), function.apply(KLAUS.getUser()));
+               assertRankingEqualsRankingFromLastMessage(PETER.getChannel(), function.apply(PETER.getChannel()));
+               assertRankingEqualsRankingFromLastMessage(KLAUS.getChannel(), function.apply(KLAUS.getChannel()));
        }
 
        private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson)
@@ -42,7 +43,7 @@ class TestData
        private static UserRanking getLastMessageFor(String user)
        {
                return getTop10Messages()
-                               .filter(kv -> kv.key.getUser().equals(user))
+                               .filter(kv -> kv.key.getChannel().equals(user))
                                .map(kv -> kv.value)
                                .map(testRanking -> userRankingFor(user, testRanking))
                                .reduce(null, (left, right) -> right);
@@ -69,7 +70,7 @@ class TestData
        private static Entry entryOf(TestEntry testEntry)
        {
                Entry entry = new Entry();
-               entry.setWord(testEntry.getWord());
+               entry.setKey(testEntry.getKey());
                entry.setCounter(testEntry.getCounter());
                return entry;
        }
@@ -136,10 +137,10 @@ class TestData
        private static KeyValue<String, TestUserData>[] USERS_MESSAGES = new KeyValue[]
        {
                        KeyValue.pair(
-                                       PETER.getUser(),
-                                       TestUserData.of(PETER.getUser(), "Peter", "Pan", TestUserData.Sex.MALE)),
+                                       PETER.getChannel(),
+                                       TestUserData.of(PETER.getChannel(), "Peter", "Pan", TestUserData.Sex.MALE)),
                        KeyValue.pair(
-                                       KLAUS.getUser(),
-                                       TestUserData.of(KLAUS.getUser(), "Klaus", "Klüse", TestUserData.Sex.OTHER)),
+                                       KLAUS.getChannel(),
+                                       TestUserData.of(KLAUS.getChannel(), "Klaus", "Klüse", TestUserData.Sex.OTHER)),
        };
 }
index ceafa82..15a8aa4 100644 (file)
@@ -10,6 +10,6 @@ import lombok.NoArgsConstructor;
 @Data
 public class TestEntry
 {
-  String word;
+  String key;
   long counter;
 }
index cc63c34..e58786a 100644 (file)
@@ -10,5 +10,6 @@ import lombok.NoArgsConstructor;
 @Data
 public class TestUser
 {
-  String user;
+  String type;
+  String channel;
 }