]> juplo.de Git - demos/kafka/wordcount/commitdiff
popular: 1.3.0 - Refined output JSON
authorKai Moritz <kai@juplo.de>
Sat, 22 Jun 2024 11:15:06 +0000 (13:15 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 22 Jun 2024 11:15:06 +0000 (13:15 +0200)
* Added attribute `type` with fixed value `POPULAR`.
* Renamed attribute `time` to `channel`.
* Adapted test-cases accordingly.

pom.xml
src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java
src/main/java/de/juplo/kafka/wordcount/popular/WindowedWord.java
src/test/java/de/juplo/kafka/wordcount/popular/TestData.java
src/test/java/de/juplo/kafka/wordcount/stats/OutputWindowedWord.java

diff --git a/pom.xml b/pom.xml
index a21d7cce62a928dab06f444fc5b25ac15de3c5e0..bf4a105c875008f2d82fef30067a1d28e0ca7881 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
        </parent>
        <groupId>de.juplo.kafka.wordcount</groupId>
        <artifactId>popular</artifactId>
-       <version>1.2.0</version>
+       <version>1.3.0</version>
        <name>Wordcount-Popular-Words</name>
        <description>Query stream-processor that gives access to the most popular words</description>
        <properties>
index 883e124d6c2f835a2b591cd6cc8161186ca22a33..896cb1d009e880e2ff180b1ac30a5b7111b8a9c0 100644 (file)
@@ -20,6 +20,7 @@ import java.util.stream.Collectors;
 @Slf4j
 public class PopularStreamProcessor
 {
+       public static final String TYPE = "POPULAR";
        public static final String KEY_VALUE_STORE_NAME = "popular";
        public static final String WINDOW_STORE_NAME = "popular-windows";
        public static final Duration WINDOW_SIZE = Duration.ofSeconds(30);
index b35ede6cc04444b5f8cc42121e9705cf55e4b306..475e7f1e4320c18037ee5cad5e2e72b8136ea36e 100644 (file)
@@ -10,6 +10,7 @@ import lombok.NoArgsConstructor;
 @AllArgsConstructor(staticName = "of")
 public class WindowedWord
 {
-  String time;
+  final String type = PopularStreamProcessor.TYPE;
+  String channel;
   String key;
 }
index 6d315751285f3409bd6d070672d647cdf772aefb..304da16fdc4cc991405e8d37693823007147c39b 100644 (file)
@@ -16,6 +16,7 @@ import java.time.ZonedDateTime;
 import java.util.function.BiConsumer;
 import java.util.stream.Stream;
 
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.TYPE;
 import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_SIZE;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -26,6 +27,7 @@ class TestData
        static final Clock CLOCK = Clock.fixed(
                        Clock.systemDefaultZone().instant(),
                        Clock.systemDefaultZone().getZone());
+
        static final String PETER = "peter";
        static final String KLAUS = "klaus";
 
@@ -35,11 +37,11 @@ class TestData
        static final String WORD_S = "s";
        static final String WORD_BOÄH = "Boäh";
 
-       static final OutputWindowedWord WINDOWED_WORD_HALLO = OutputWindowedWord.of(windowStart(), WORD_HALLO);
-       static final OutputWindowedWord WINDOWED_WORD_WELT = OutputWindowedWord.of(windowStart(), WORD_WELT);
-       static final OutputWindowedWord WINDOWED_WORD_BOÄH = OutputWindowedWord.of(windowStart(), WORD_BOÄH);
-       static final OutputWindowedWord WINDOWED_WORD_MÜSCH = OutputWindowedWord.of(windowStart(), WORD_MÜSCH);
-       static final OutputWindowedWord WINDOWED_WORD_S = OutputWindowedWord.of(windowStart(), WORD_S);
+       static final OutputWindowedWord WINDOWED_WORD_HALLO = OutputWindowedWord.of(TYPE, windowStart(), WORD_HALLO);
+       static final OutputWindowedWord WINDOWED_WORD_WELT = OutputWindowedWord.of(TYPE, windowStart(), WORD_WELT);
+       static final OutputWindowedWord WINDOWED_WORD_BOÄH = OutputWindowedWord.of(TYPE, windowStart(), WORD_BOÄH);
+       static final OutputWindowedWord WINDOWED_WORD_MÜSCH = OutputWindowedWord.of(TYPE, windowStart(), WORD_MÜSCH);
+       static final OutputWindowedWord WINDOWED_WORD_S = OutputWindowedWord.of(TYPE, windowStart(), WORD_S);
 
        private static String windowStart()
        {
@@ -190,7 +192,7 @@ class TestData
        {
                WindowedWord windowedWord = new WindowedWord();
 
-               windowedWord.setTime(outputWindowedWord.getTime());
+               windowedWord.setChannel(outputWindowedWord.getChannel());
                windowedWord.setKey(outputWindowedWord.getKey());
 
                return windowedWord;
index ab7cc599fadb83328ea40562b5cb6586223088fe..8bec1137abb6b3f32cc220d46c775ec0e7740efb 100644 (file)
@@ -10,6 +10,7 @@ import lombok.NoArgsConstructor;
 @AllArgsConstructor(staticName = "of")
 public class OutputWindowedWord
 {
-  String time;
+  String type;
+  String channel;
   String key;
 }