WIP stats-on-top10
authorKai Moritz <kai@juplo.de>
Sat, 22 Jun 2024 06:55:34 +0000 (08:55 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 22 Jun 2024 06:55:34 +0000 (08:55 +0200)
src/main/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessor.java
src/main/java/de/juplo/kafka/wordcount/stats/WindowedKey.java
src/test/java/de/juplo/kafka/wordcount/in/InputCounter.java
src/test/java/de/juplo/kafka/wordcount/in/InputWindowedKey.java
src/test/java/de/juplo/kafka/wordcount/out/TestEntry.java
src/test/java/de/juplo/kafka/wordcount/stats/TestData.java

index 15c08ee..563f92b 100644 (file)
@@ -41,7 +41,7 @@ public class StatsStreamProcessor
 
                builder
                                .<Key, Entry>stream(inputTopic)
-                               .map((key, entry) -> new KeyValue<>(WindowedKey.of(key.getUser()), entry))
+                               .map((key, entry) -> new KeyValue<>(WindowedKey.of(0, 0, key.getUser()), entry))
                                .groupByKey()
                                .aggregate(
                                                () -> new Ranking(),
index 9b77cac..86d6fea 100644 (file)
@@ -10,5 +10,7 @@ import lombok.NoArgsConstructor;
 @Data
 public class WindowedKey
 {
+  long start;
+  long end;
   String user;
 }
index ada66d3..85c5840 100644 (file)
@@ -10,12 +10,11 @@ import lombok.NoArgsConstructor;
 @AllArgsConstructor(staticName = "of")
 public class InputCounter
 {
-  String user;
   String key;
   long counter;
 
   public static InputCounter of(InputWindowedKey word, long counter)
   {
-    return new InputCounter(word.getUser(), word.getKey(), counter);
+    return new InputCounter(word.getKey(), counter);
   }
 }
index 255d206..5b44cf9 100644 (file)
@@ -12,6 +12,7 @@ import lombok.NoArgsConstructor;
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class InputWindowedKey
 {
-  private String user;
+  private long start;
+  private long end;
   private String key;
 }
index f7f5679..e976c14 100644 (file)
@@ -10,6 +10,6 @@ import lombok.NoArgsConstructor;
 @Data
 public class TestEntry
 {
-  String key;
+  String word;
   long counter;
 }
index c884b06..d5898a4 100644 (file)
@@ -29,38 +29,38 @@ class TestData
        private static final KeyValue<InputWindowedKey, InputCounter>[] INPUT_MESSAGES = new KeyValue[]
        {
                        new KeyValue<>(
-                                       InputWindowedKey.of(PETER.getUser(),"Hallo"),
-                                       InputCounter.of(PETER.getUser(),"Hallo",1)),
+                                       InputWindowedKey.of(0, 0,"Hallo"),
+                                       InputCounter.of("Hallo",1)),
                        new KeyValue<>(
-                                       InputWindowedKey.of(KLAUS.getUser(),"Müsch"),
-                                       InputCounter.of(KLAUS.getUser(),"Müsch",1)),
+                                       InputWindowedKey.of(0, 0,"Müsch"),
+                                       InputCounter.of("Müsch",1)),
                        new KeyValue<>(
-                                       InputWindowedKey.of(PETER.getUser(),"Welt"),
-                                       InputCounter.of(PETER.getUser(),"Welt",1)),
+                                       InputWindowedKey.of(0, 0,"Welt"),
+                                       InputCounter.of("Welt",1)),
                        new KeyValue<>(
-                                       InputWindowedKey.of(KLAUS.getUser(),"Müsch"),
-                                       InputCounter.of(KLAUS.getUser(),"Müsch",2)),
+                                       InputWindowedKey.of(0, 0,"Müsch"),
+                                       InputCounter.of("Müsch",2)),
                        new KeyValue<>(
-                                       InputWindowedKey.of(KLAUS.getUser(),"s"),
-                                       InputCounter.of(KLAUS.getUser(),"s",1)),
+                                       InputWindowedKey.of(0, 0,"s"),
+                                       InputCounter.of("s",1)),
                        new KeyValue<>(
-                                       InputWindowedKey.of(PETER.getUser(),"Boäh"),
-                                       InputCounter.of(PETER.getUser(),"Boäh",1)),
+                                       InputWindowedKey.of(0, 0,"Boäh"),
+                                       InputCounter.of("Boäh",1)),
                        new KeyValue<>(
-                                       InputWindowedKey.of(PETER.getUser(),"Welt"),
-                                       InputCounter.of(PETER.getUser(),"Welt",2)),
+                                       InputWindowedKey.of(0, 0,"Welt"),
+                                       InputCounter.of("Welt",2)),
                        new KeyValue<>(
-                                       InputWindowedKey.of(PETER.getUser(),"Boäh"),
-                                       InputCounter.of(PETER.getUser(),"Boäh",2)),
+                                       InputWindowedKey.of(0, 0,"Boäh"),
+                                       InputCounter.of("Boäh",2)),
                        new KeyValue<>(
-                                       InputWindowedKey.of(KLAUS.getUser(),"s"),
-                                       InputCounter.of(KLAUS.getUser(),"s",2)),
+                                       InputWindowedKey.of(0, 0,"s"),
+                                       InputCounter.of("s",2)),
                        new KeyValue<>(
-                                       InputWindowedKey.of(PETER.getUser(),"Boäh"),
-                                       InputCounter.of(PETER.getUser(),"Boäh",3)),
+                                       InputWindowedKey.of(0, 0,"Boäh"),
+                                       InputCounter.of("Boäh",3)),
                        new KeyValue<>(
-                                       InputWindowedKey.of(KLAUS.getUser(),"s"),
-                                       InputCounter.of(KLAUS.getUser(),"s",3)),
+                                       InputWindowedKey.of(0, 0,"s"),
+                                       InputCounter.of("s",3)),
        };
 
        static void assertExpectedMessages(MultiValueMap<TestUser, TestRanking> receivedMessages)
@@ -79,7 +79,7 @@ class TestData
 
        private static WindowedKey userOf(TestUser user)
        {
-               return WindowedKey.of(user.getUser());
+               return WindowedKey.of(0, 0, user.getUser());
        }
 
        static void assertExpectedNumberOfMessagesForUsers(MultiValueMap<TestUser, TestRanking> receivedMessages)