From: Kai Moritz Date: Sat, 22 Jun 2024 06:55:34 +0000 (+0200) Subject: WIP X-Git-Tag: stats-on-top10 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=a179b0e5fd981a74dc9d6c0bff34a39d90b06fff;p=demos%2Fkafka%2Fwordcount WIP --- diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessor.java index 15c08ee..563f92b 100644 --- a/src/main/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessor.java @@ -41,7 +41,7 @@ public class StatsStreamProcessor builder .stream(inputTopic) - .map((key, entry) -> new KeyValue<>(WindowedKey.of(key.getUser()), entry)) + .map((key, entry) -> new KeyValue<>(WindowedKey.of(0, 0, key.getUser()), entry)) .groupByKey() .aggregate( () -> new Ranking(), diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/WindowedKey.java b/src/main/java/de/juplo/kafka/wordcount/stats/WindowedKey.java index 9b77cac..86d6fea 100644 --- a/src/main/java/de/juplo/kafka/wordcount/stats/WindowedKey.java +++ b/src/main/java/de/juplo/kafka/wordcount/stats/WindowedKey.java @@ -10,5 +10,7 @@ import lombok.NoArgsConstructor; @Data public class WindowedKey { + long start; + long end; String user; } diff --git a/src/test/java/de/juplo/kafka/wordcount/in/InputCounter.java b/src/test/java/de/juplo/kafka/wordcount/in/InputCounter.java index ada66d3..85c5840 100644 --- a/src/test/java/de/juplo/kafka/wordcount/in/InputCounter.java +++ b/src/test/java/de/juplo/kafka/wordcount/in/InputCounter.java @@ -10,12 +10,11 @@ import lombok.NoArgsConstructor; @AllArgsConstructor(staticName = "of") public class InputCounter { - String user; String key; long counter; public static InputCounter of(InputWindowedKey word, long counter) { - return new InputCounter(word.getUser(), word.getKey(), counter); + return new InputCounter(word.getKey(), counter); } } diff --git a/src/test/java/de/juplo/kafka/wordcount/in/InputWindowedKey.java b/src/test/java/de/juplo/kafka/wordcount/in/InputWindowedKey.java index 255d206..5b44cf9 100644 --- a/src/test/java/de/juplo/kafka/wordcount/in/InputWindowedKey.java +++ b/src/test/java/de/juplo/kafka/wordcount/in/InputWindowedKey.java @@ -12,6 +12,7 @@ import lombok.NoArgsConstructor; @JsonIgnoreProperties(ignoreUnknown = true) public class InputWindowedKey { - private String user; + private long start; + private long end; private String key; } diff --git a/src/test/java/de/juplo/kafka/wordcount/out/TestEntry.java b/src/test/java/de/juplo/kafka/wordcount/out/TestEntry.java index f7f5679..e976c14 100644 --- a/src/test/java/de/juplo/kafka/wordcount/out/TestEntry.java +++ b/src/test/java/de/juplo/kafka/wordcount/out/TestEntry.java @@ -10,6 +10,6 @@ import lombok.NoArgsConstructor; @Data public class TestEntry { - String key; + String word; long counter; } diff --git a/src/test/java/de/juplo/kafka/wordcount/stats/TestData.java b/src/test/java/de/juplo/kafka/wordcount/stats/TestData.java index c884b06..d5898a4 100644 --- a/src/test/java/de/juplo/kafka/wordcount/stats/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/stats/TestData.java @@ -29,38 +29,38 @@ class TestData private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] { new KeyValue<>( - InputWindowedKey.of(PETER.getUser(),"Hallo"), - InputCounter.of(PETER.getUser(),"Hallo",1)), + InputWindowedKey.of(0, 0,"Hallo"), + InputCounter.of("Hallo",1)), new KeyValue<>( - InputWindowedKey.of(KLAUS.getUser(),"Müsch"), - InputCounter.of(KLAUS.getUser(),"Müsch",1)), + InputWindowedKey.of(0, 0,"Müsch"), + InputCounter.of("Müsch",1)), new KeyValue<>( - InputWindowedKey.of(PETER.getUser(),"Welt"), - InputCounter.of(PETER.getUser(),"Welt",1)), + InputWindowedKey.of(0, 0,"Welt"), + InputCounter.of("Welt",1)), new KeyValue<>( - InputWindowedKey.of(KLAUS.getUser(),"Müsch"), - InputCounter.of(KLAUS.getUser(),"Müsch",2)), + InputWindowedKey.of(0, 0,"Müsch"), + InputCounter.of("Müsch",2)), new KeyValue<>( - InputWindowedKey.of(KLAUS.getUser(),"s"), - InputCounter.of(KLAUS.getUser(),"s",1)), + InputWindowedKey.of(0, 0,"s"), + InputCounter.of("s",1)), new KeyValue<>( - InputWindowedKey.of(PETER.getUser(),"Boäh"), - InputCounter.of(PETER.getUser(),"Boäh",1)), + InputWindowedKey.of(0, 0,"Boäh"), + InputCounter.of("Boäh",1)), new KeyValue<>( - InputWindowedKey.of(PETER.getUser(),"Welt"), - InputCounter.of(PETER.getUser(),"Welt",2)), + InputWindowedKey.of(0, 0,"Welt"), + InputCounter.of("Welt",2)), new KeyValue<>( - InputWindowedKey.of(PETER.getUser(),"Boäh"), - InputCounter.of(PETER.getUser(),"Boäh",2)), + InputWindowedKey.of(0, 0,"Boäh"), + InputCounter.of("Boäh",2)), new KeyValue<>( - InputWindowedKey.of(KLAUS.getUser(),"s"), - InputCounter.of(KLAUS.getUser(),"s",2)), + InputWindowedKey.of(0, 0,"s"), + InputCounter.of("s",2)), new KeyValue<>( - InputWindowedKey.of(PETER.getUser(),"Boäh"), - InputCounter.of(PETER.getUser(),"Boäh",3)), + InputWindowedKey.of(0, 0,"Boäh"), + InputCounter.of("Boäh",3)), new KeyValue<>( - InputWindowedKey.of(KLAUS.getUser(),"s"), - InputCounter.of(KLAUS.getUser(),"s",3)), + InputWindowedKey.of(0, 0,"s"), + InputCounter.of("s",3)), }; static void assertExpectedMessages(MultiValueMap receivedMessages) @@ -79,7 +79,7 @@ class TestData private static WindowedKey userOf(TestUser user) { - return WindowedKey.of(user.getUser()); + return WindowedKey.of(0, 0, user.getUser()); } static void assertExpectedNumberOfMessagesForUsers(MultiValueMap receivedMessages)