From 37c78ce13c2c8b6653704177bd4284e8aed94166 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 22 Jun 2024 13:15:06 +0200 Subject: [PATCH] popular: 1.3.0 - Refined output JSON * Added attribute `type` with fixed value `POPULAR`. * Renamed attribute `time` to `channel`. * Adapted test-cases accordingly. --- pom.xml | 2 +- .../wordcount/popular/PopularStreamProcessor.java | 1 + .../kafka/wordcount/popular/WindowedWord.java | 3 ++- .../de/juplo/kafka/wordcount/popular/TestData.java | 14 ++++++++------ .../kafka/wordcount/stats/OutputWindowedWord.java | 3 ++- 5 files changed, 14 insertions(+), 9 deletions(-) diff --git a/pom.xml b/pom.xml index a21d7cc..bf4a105 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount popular - 1.2.0 + 1.3.0 Wordcount-Popular-Words Query stream-processor that gives access to the most popular words diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java index 883e124..896cb1d 100644 --- a/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java @@ -20,6 +20,7 @@ import java.util.stream.Collectors; @Slf4j public class PopularStreamProcessor { + public static final String TYPE = "POPULAR"; public static final String KEY_VALUE_STORE_NAME = "popular"; public static final String WINDOW_STORE_NAME = "popular-windows"; public static final Duration WINDOW_SIZE = Duration.ofSeconds(30); diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/WindowedWord.java b/src/main/java/de/juplo/kafka/wordcount/popular/WindowedWord.java index b35ede6..475e7f1 100644 --- a/src/main/java/de/juplo/kafka/wordcount/popular/WindowedWord.java +++ b/src/main/java/de/juplo/kafka/wordcount/popular/WindowedWord.java @@ -10,6 +10,7 @@ import lombok.NoArgsConstructor; @AllArgsConstructor(staticName = "of") public class WindowedWord { - String time; + final String type = PopularStreamProcessor.TYPE; + String channel; String key; } diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java b/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java index 6d31575..304da16 100644 --- a/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java @@ -16,6 +16,7 @@ import java.time.ZonedDateTime; import java.util.function.BiConsumer; import java.util.stream.Stream; +import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.TYPE; import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_SIZE; import static org.assertj.core.api.Assertions.assertThat; @@ -26,6 +27,7 @@ class TestData static final Clock CLOCK = Clock.fixed( Clock.systemDefaultZone().instant(), Clock.systemDefaultZone().getZone()); + static final String PETER = "peter"; static final String KLAUS = "klaus"; @@ -35,11 +37,11 @@ class TestData static final String WORD_S = "s"; static final String WORD_BOÄH = "Boäh"; - static final OutputWindowedWord WINDOWED_WORD_HALLO = OutputWindowedWord.of(windowStart(), WORD_HALLO); - static final OutputWindowedWord WINDOWED_WORD_WELT = OutputWindowedWord.of(windowStart(), WORD_WELT); - static final OutputWindowedWord WINDOWED_WORD_BOÄH = OutputWindowedWord.of(windowStart(), WORD_BOÄH); - static final OutputWindowedWord WINDOWED_WORD_MÜSCH = OutputWindowedWord.of(windowStart(), WORD_MÜSCH); - static final OutputWindowedWord WINDOWED_WORD_S = OutputWindowedWord.of(windowStart(), WORD_S); + static final OutputWindowedWord WINDOWED_WORD_HALLO = OutputWindowedWord.of(TYPE, windowStart(), WORD_HALLO); + static final OutputWindowedWord WINDOWED_WORD_WELT = OutputWindowedWord.of(TYPE, windowStart(), WORD_WELT); + static final OutputWindowedWord WINDOWED_WORD_BOÄH = OutputWindowedWord.of(TYPE, windowStart(), WORD_BOÄH); + static final OutputWindowedWord WINDOWED_WORD_MÜSCH = OutputWindowedWord.of(TYPE, windowStart(), WORD_MÜSCH); + static final OutputWindowedWord WINDOWED_WORD_S = OutputWindowedWord.of(TYPE, windowStart(), WORD_S); private static String windowStart() { @@ -190,7 +192,7 @@ class TestData { WindowedWord windowedWord = new WindowedWord(); - windowedWord.setTime(outputWindowedWord.getTime()); + windowedWord.setChannel(outputWindowedWord.getChannel()); windowedWord.setKey(outputWindowedWord.getKey()); return windowedWord; diff --git a/src/test/java/de/juplo/kafka/wordcount/stats/OutputWindowedWord.java b/src/test/java/de/juplo/kafka/wordcount/stats/OutputWindowedWord.java index ab7cc59..8bec113 100644 --- a/src/test/java/de/juplo/kafka/wordcount/stats/OutputWindowedWord.java +++ b/src/test/java/de/juplo/kafka/wordcount/stats/OutputWindowedWord.java @@ -10,6 +10,7 @@ import lombok.NoArgsConstructor; @AllArgsConstructor(staticName = "of") public class OutputWindowedWord { - String time; + String type; + String channel; String key; } -- 2.20.1