top10: 1.3.0 - Refined input JSON to match the new general stats-format top10-1.3.0
authorKai Moritz <kai@juplo.de>
Sat, 22 Jun 2024 14:09:46 +0000 (16:09 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 22 Jun 2024 16:11:49 +0000 (18:11 +0200)
* Adapted the configuration to the changed type-mapping for the key.
* Refined the class `Key`, that defines the JSON for the input key.
** Added attribute `type` with value of type `enum StatsType`.
** Renamed attribute `user` to `channel`.
** Renamed attribute `word` to `key`.
* Refined the class `Entry`, that defines the JSON for the input value.
** Renamed attribute `word` to `key`.
* Adapted test-classes and -cases accordingly.

14 files changed:
pom.xml
src/main/java/de/juplo/kafka/wordcount/top10/Entry.java
src/main/java/de/juplo/kafka/wordcount/top10/Key.java
src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java
src/main/java/de/juplo/kafka/wordcount/top10/StatsType.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java
src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
src/test/java/de/juplo/kafka/wordcount/counter/TestCounter.java
src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java
src/test/java/de/juplo/kafka/wordcount/query/TestEntry.java
src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java
src/test/java/de/juplo/kafka/wordcount/top10/TestData.java
src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java
src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java

diff --git a/pom.xml b/pom.xml
index b30c4ea..e5cd268 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
        </parent>
        <groupId>de.juplo.kafka.wordcount</groupId>
        <artifactId>top10</artifactId>
-       <version>1.2.1</version>
+       <version>1.3.0</version>
        <name>Wordcount-Top-10</name>
        <description>Top-10 stream-processor of the multi-user wordcount-example</description>
        <properties>
index b25fc07..7d00500 100644 (file)
@@ -15,6 +15,6 @@ import lombok.NoArgsConstructor;
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class Entry
 {
-  private String word;
+  private String key;
   private Long counter;
 }
index ffac8ea..aaf016c 100644 (file)
@@ -12,6 +12,7 @@ import lombok.*;
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class Key
 {
-  private String user;
-  private String word;
+  private StatsType type;
+  private String channel;
+  private String key;
 }
index 4f56c18..279716a 100644 (file)
@@ -49,7 +49,7 @@ public class Ranking
         for (int j = i+1; j < list.size(); j++)
         {
           entry = list.get(j);
-          if(entry.getWord().equals(newEntry.getWord()))
+          if(entry.getKey().equals(newEntry.getKey()))
           {
             list.remove(j);
             break;
@@ -63,7 +63,7 @@ public class Ranking
         return this;
       }
 
-      if (entry.getWord().equals(newEntry.getWord()))
+      if (entry.getKey().equals(newEntry.getKey()))
         oldPosition = i;
     }
 
@@ -93,12 +93,12 @@ public class Ranking
     {
       Entry entry = this.entries[i];
 
-      if (seenWords.contains(entry.getWord()))
-        throw new IllegalArgumentException("Invalid Ranking: Multiple occurrences of word -> " + entry.getWord());
+      if (seenWords.contains(entry.getKey()))
+        throw new IllegalArgumentException("Invalid Ranking: Multiple occurrences of word -> " + entry.getKey());
       if (entry.getCounter() > lowesCounting)
         throw new IllegalArgumentException("Invalid Ranking: Entries are not sorted correctly");
 
-      seenWords.add(entry.getWord());
+      seenWords.add(entry.getKey());
       lowesCounting = entry.getCounter();
     }
 
@@ -128,13 +128,13 @@ public class Ranking
     Set<String> otherWordsWithCurrentCount = new HashSet<>();
     Entry myEntry = entries[i];
     long currentCount = myEntry.getCounter();
-    myWordsWithCurrentCount.add(myEntry.getWord());
+    myWordsWithCurrentCount.add(myEntry.getKey());
     while (true)
     {
       Entry otherEntry = other.entries[i];
       if (otherEntry.getCounter() != currentCount)
         return false;
-      otherWordsWithCurrentCount.add(otherEntry.getWord());
+      otherWordsWithCurrentCount.add(otherEntry.getKey());
       if (++i >= entries.length)
         return myWordsWithCurrentCount.equals(otherWordsWithCurrentCount);
       myEntry = entries[i];
@@ -146,7 +146,7 @@ public class Ranking
         myWordsWithCurrentCount.clear();
         otherWordsWithCurrentCount.clear();
       }
-      myWordsWithCurrentCount.add(myEntry.getWord());
+      myWordsWithCurrentCount.add(myEntry.getKey());
     }
   }
 
diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/StatsType.java b/src/main/java/de/juplo/kafka/wordcount/top10/StatsType.java
new file mode 100644 (file)
index 0000000..b1b8f9b
--- /dev/null
@@ -0,0 +1,7 @@
+package de.juplo.kafka.wordcount.top10;
+
+enum StatsType
+{
+  COUNTER,
+  POPULAR
+}
index 255f0e4..57e5a47 100644 (file)
@@ -55,7 +55,7 @@ public class Top10ApplicationConfiguration
                props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName());
                props.put(
                                JsonDeserializer.TYPE_MAPPINGS,
-                               "word:" + Key.class.getName() + "," +
+                               "key:" + Key.class.getName() + "," +
                                "counter:" + Entry.class.getName() + "," +
                                "user:" + User.class.getName() + "," +
                                "ranking:" + Ranking.class.getName());
index 70ead87..907c7ff 100644 (file)
@@ -41,7 +41,7 @@ public class Top10StreamProcessor
 
                builder
                                .<Key, Entry>stream(inputTopic)
-                               .map((key, entry) -> new KeyValue<>(User.of(key.getUser()), entry))
+                               .map((key, entry) -> new KeyValue<>(User.of(key.getChannel()), entry))
                                .groupByKey()
                                .aggregate(
                                                () -> new Ranking(),
index d98ae64..b78c429 100644 (file)
@@ -10,12 +10,6 @@ import lombok.NoArgsConstructor;
 @AllArgsConstructor(staticName = "of")
 public class TestCounter
 {
-  String user;
-  String word;
+  String key;
   long counter;
-
-  public static TestCounter of(TestWord word, long counter)
-  {
-    return new TestCounter(word.getUser(), word.getWord(), counter);
-  }
 }
index 8008e12..00c1af7 100644 (file)
@@ -12,6 +12,7 @@ import lombok.NoArgsConstructor;
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class TestWord
 {
-  private String user;
-  private String word;
+  private String type;
+  private String channel;
+  private String key;
 }
index a5152e6..8019da9 100644 (file)
@@ -10,6 +10,6 @@ import lombok.NoArgsConstructor;
 @Data
 public class TestEntry
 {
-  String word;
+  String key;
   long counter;
 }
index 26749e9..0f36860 100644 (file)
@@ -108,7 +108,7 @@ public class RankingTest
             Stream.of(highestEntry),
             VALID_RANKINGS[0]
                 .stream()
-                .filter(entry -> !entry.getWord().equals(word)))
+                .filter(entry -> !entry.getKey().equals(word)))
         .toList();
     assertThat(ranking.getEntries()).containsExactlyElementsOf(expectedEntries);
   }
@@ -134,7 +134,7 @@ public class RankingTest
     Ranking ranking = Ranking.of(toArray(entryList));
     entryList.forEach(entry ->
       assertThatExceptionOfType(IllegalArgumentException.class)
-          .isThrownBy(() -> ranking.add(Entry.of(entry.getWord(), entry.getCounter() - 1))));
+          .isThrownBy(() -> ranking.add(Entry.of(entry.getKey(), entry.getCounter() - 1))));
   }
 
   @DisplayName("Identical rankings are considered equal")
index 7a3a27e..4fb229b 100644 (file)
@@ -18,6 +18,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 class TestData
 {
+       static final String TYPE_COUNTER = "COUNTER";
+
        static final TestUser PETER = TestUser.of("peter");
        static final TestUser KLAUS = TestUser.of("klaus");
 
@@ -29,38 +31,38 @@ class TestData
        private static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
        {
                        new KeyValue<>(
-                                       TestWord.of(PETER.getUser(),"Hallo"),
-                                       TestCounter.of(PETER.getUser(),"Hallo",1)),
+                                       TestWord.of(TYPE_COUNTER, PETER.getUser(),"Hallo"),
+                                       TestCounter.of("Hallo",1)),
                        new KeyValue<>(
-                                       TestWord.of(KLAUS.getUser(),"Müsch"),
-                                       TestCounter.of(KLAUS.getUser(),"Müsch",1)),
+                                       TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"Müsch"),
+                                       TestCounter.of("Müsch",1)),
                        new KeyValue<>(
-                                       TestWord.of(PETER.getUser(),"Welt"),
-                                       TestCounter.of(PETER.getUser(),"Welt",1)),
+                                       TestWord.of(TYPE_COUNTER, PETER.getUser(),"Welt"),
+                                       TestCounter.of("Welt",1)),
                        new KeyValue<>(
-                                       TestWord.of(KLAUS.getUser(),"Müsch"),
-                                       TestCounter.of(KLAUS.getUser(),"Müsch",2)),
+                                       TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"Müsch"),
+                                       TestCounter.of("Müsch",2)),
                        new KeyValue<>(
-                                       TestWord.of(KLAUS.getUser(),"s"),
-                                       TestCounter.of(KLAUS.getUser(),"s",1)),
+                                       TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"s"),
+                                       TestCounter.of("s",1)),
                        new KeyValue<>(
-                                       TestWord.of(PETER.getUser(),"Boäh"),
-                                       TestCounter.of(PETER.getUser(),"Boäh",1)),
+                                       TestWord.of(TYPE_COUNTER, PETER.getUser(),"Boäh"),
+                                       TestCounter.of("Boäh",1)),
                        new KeyValue<>(
-                                       TestWord.of(PETER.getUser(),"Welt"),
-                                       TestCounter.of(PETER.getUser(),"Welt",2)),
+                                       TestWord.of(TYPE_COUNTER, PETER.getUser(),"Welt"),
+                                       TestCounter.of("Welt",2)),
                        new KeyValue<>(
-                                       TestWord.of(PETER.getUser(),"Boäh"),
-                                       TestCounter.of(PETER.getUser(),"Boäh",2)),
+                                       TestWord.of(TYPE_COUNTER, PETER.getUser(),"Boäh"),
+                                       TestCounter.of("Boäh",2)),
                        new KeyValue<>(
-                                       TestWord.of(KLAUS.getUser(),"s"),
-                                       TestCounter.of(KLAUS.getUser(),"s",2)),
+                                       TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"s"),
+                                       TestCounter.of("s",2)),
                        new KeyValue<>(
-                                       TestWord.of(PETER.getUser(),"Boäh"),
-                                       TestCounter.of(PETER.getUser(),"Boäh",3)),
+                                       TestWord.of(TYPE_COUNTER, PETER.getUser(),"Boäh"),
+                                       TestCounter.of("Boäh",3)),
                        new KeyValue<>(
-                                       TestWord.of(KLAUS.getUser(),"s"),
-                                       TestCounter.of(KLAUS.getUser(),"s",3)),
+                                       TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"s"),
+                                       TestCounter.of("s",3)),
        };
 
        static void assertExpectedMessages(MultiValueMap<TestUser, TestRanking> receivedMessages)
@@ -113,7 +115,7 @@ class TestData
                return Arrays
                                .stream(entries)
                                .map(entry -> TestEntry.of(
-                                               entry.getWord(),
+                                               entry.getKey(),
                                                entry.getCounter() == null
                                                                ? -1l
                                                                : entry.getCounter()))
index f5ef236..5f0e817 100644 (file)
@@ -36,7 +36,7 @@ import static org.awaitility.Awaitility.await;
                properties = {
                                "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
                                "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
-                               "spring.kafka.producer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.counter.TestWord,counter:de.juplo.kafka.wordcount.counter.TestCounter",
+                               "spring.kafka.producer.properties.spring.json.type.mapping=key:de.juplo.kafka.wordcount.counter.TestWord,counter:de.juplo.kafka.wordcount.counter.TestCounter",
                                "spring.kafka.consumer.auto-offset-reset=earliest",
                                "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
                                "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
index cca9a3a..a8fc859 100644 (file)
@@ -111,7 +111,7 @@ public class Top10StreamProcessorTopologyTest
     jsonSerializer.configure(
         Map.of(
             JsonSerializer.TYPE_MAPPINGS,
-            "word:" + TestWord.class.getName() + "," +
+            "key:" + TestWord.class.getName() + "," +
             "counter:" + TestCounter.class.getName()),
         isKey);
     return jsonSerializer;