popular: 1.2.0 - Refined `WindowedWord` (timestamp as string) popular-1.2.0
authorKai Moritz <kai@juplo.de>
Sat, 22 Jun 2024 09:23:25 +0000 (11:23 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 22 Jun 2024 11:12:36 +0000 (13:12 +0200)
* `WindowedWord` only stores the start-time of the window.
* `WindowedWord` stores the seconds-since-epoce as string.

src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java
src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java
src/main/java/de/juplo/kafka/wordcount/popular/WindowedWord.java
src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java
src/test/java/de/juplo/kafka/wordcount/popular/TestData.java
src/test/java/de/juplo/kafka/wordcount/stats/OutputWindowedWord.java

index 45a0a57..2cbe07a 100644 (file)
@@ -14,7 +14,6 @@ import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerde;
 
-import java.time.Duration;
 import java.time.ZoneId;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
@@ -62,7 +61,6 @@ public class PopularApplicationConfiguriation
        public PopularStreamProcessor streamProcessor(
                        PopularApplicationProperties applicationProperties,
                        Properties streamProcessorProperties,
-                       ZoneId zone,
                        KeyValueBytesStoreSupplier keyValueBytesStoreSupplier,
                        WindowBytesStoreSupplier windowBytesStoreSupplier,
                        ConfigurableApplicationContext context)
@@ -71,7 +69,6 @@ public class PopularApplicationConfiguriation
                                applicationProperties.getInputTopic(),
                                applicationProperties.getOutputTopic(),
                                streamProcessorProperties,
-                               zone,
                                windowBytesStoreSupplier,
                                keyValueBytesStoreSupplier);
 
@@ -90,12 +87,6 @@ public class PopularApplicationConfiguriation
                return streamProcessor;
        }
 
-       @Bean
-       public ZoneId defaultZone()
-       {
-               return ZoneId.systemDefault();
-       }
-
        @Bean
        public WindowBytesStoreSupplier windowBytesStoreSupplier()
        {
index 9bcf0f7..883e124 100644 (file)
@@ -12,8 +12,6 @@ import org.springframework.kafka.support.serializer.JsonSerde;
 import org.springframework.kafka.support.serializer.JsonSerializer;
 
 import java.time.Duration;
-import java.time.ZoneId;
-import java.time.ZonedDateTime;
 import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
@@ -34,14 +32,12 @@ public class PopularStreamProcessor
                        String inputTopic,
                        String outputTopic,
                        Properties properties,
-                       ZoneId zone,
                        WindowBytesStoreSupplier windowBytesStoreSupplier,
                        KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
        {
                Topology topology = PopularStreamProcessor.buildTopology(
                                inputTopic,
                                outputTopic,
-                               zone,
                                windowBytesStoreSupplier,
                                keyValueBytesStoreSupplier);
 
@@ -51,7 +47,6 @@ public class PopularStreamProcessor
        static Topology buildTopology(
                        String inputTopic,
                        String outputTopic,
-                       ZoneId zone,
                        WindowBytesStoreSupplier windowBytesStoreSupplier,
                        KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
        {
@@ -74,8 +69,7 @@ public class PopularStreamProcessor
                                .peek((windowedWord, counter) -> log.info("windowed: {} -> {}", windowedWord, counter))
                                .map((windowedWord, counter) -> new KeyValue<>(
                                                WindowedWord.of(
-                                                               ZonedDateTime.ofInstant(windowedWord.window().startTime(), zone),
-                                                               ZonedDateTime.ofInstant(windowedWord.window().endTime(), zone),
+                                                               Long.toString(windowedWord.window().startTime().getEpochSecond()),
                                                                windowedWord.key().getWord()),
                                                WordCounter.of(windowedWord.key().getWord(), counter)))
                                .peek((windowedWord, wordCounter) -> log.info("results: {} -> {}", windowedWord, wordCounter))
index b759f2e..b35ede6 100644 (file)
@@ -4,15 +4,12 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
-import java.time.ZonedDateTime;
-
 
 @Data
 @NoArgsConstructor
 @AllArgsConstructor(staticName = "of")
 public class WindowedWord
 {
-  ZonedDateTime start;
-  ZonedDateTime end;
+  String time;
   String key;
 }
index 23ab524..aa99b8e 100644 (file)
@@ -20,8 +20,6 @@ import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
 
-import java.time.Duration;
-import java.time.ZoneId;
 import java.util.Map;
 
 import static de.juplo.kafka.wordcount.popular.PopularApplicationConfiguriation.serializationConfig;
@@ -33,7 +31,6 @@ public class PopularStreamProcessorTopologyTest
 {
   public static final String IN = "TEST-IN";
   public static final String OUT = "TEST-OUT";
-  public static final ZoneId ZONE = ZoneId.of("Europe/Berlin");
 
 
   static TopologyTestDriver testDriver;
@@ -46,7 +43,6 @@ public class PopularStreamProcessorTopologyTest
     Topology topology = PopularStreamProcessor.buildTopology(
         IN,
         OUT,
-        ZONE,
         Stores.inMemoryWindowStore(
             WINDOW_STORE_NAME,
             WINDOW_SIZE.multipliedBy(2),
index 0d3036f..6d31575 100644 (file)
@@ -35,20 +35,20 @@ class TestData
        static final String WORD_S = "s";
        static final String WORD_BOÄH = "Boäh";
 
-       static final OutputWindowedWord WINDOWED_WORD_HALLO = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_HALLO);
-       static final OutputWindowedWord WINDOWED_WORD_WELT = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_WELT);
-       static final OutputWindowedWord WINDOWED_WORD_BOÄH = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_BOÄH);
-       static final OutputWindowedWord WINDOWED_WORD_MÜSCH = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_MÜSCH);
-       static final OutputWindowedWord WINDOWED_WORD_S = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_S);
+       static final OutputWindowedWord WINDOWED_WORD_HALLO = OutputWindowedWord.of(windowStart(), WORD_HALLO);
+       static final OutputWindowedWord WINDOWED_WORD_WELT = OutputWindowedWord.of(windowStart(), WORD_WELT);
+       static final OutputWindowedWord WINDOWED_WORD_BOÄH = OutputWindowedWord.of(windowStart(), WORD_BOÄH);
+       static final OutputWindowedWord WINDOWED_WORD_MÜSCH = OutputWindowedWord.of(windowStart(), WORD_MÜSCH);
+       static final OutputWindowedWord WINDOWED_WORD_S = OutputWindowedWord.of(windowStart(), WORD_S);
 
-       private static Instant windowStart()
+       private static String windowStart()
        {
-               return windowBoundFor(0);
+               return toEpochSecond(windowBoundFor(0));
        }
 
-       private static Instant windowEnd()
+       private static String toEpochSecond(Instant instant)
        {
-               return windowBoundFor(WINDOW_SIZE.toSecondsPart());
+               return Long.toString(instant.getEpochSecond());
        }
 
        private static Instant windowBoundFor(int second)
@@ -190,8 +190,7 @@ class TestData
        {
                WindowedWord windowedWord = new WindowedWord();
 
-               windowedWord.setStart(ZonedDateTime.ofInstant(outputWindowedWord.getStart(), CLOCK.getZone()));
-               windowedWord.setEnd(ZonedDateTime.ofInstant(outputWindowedWord.getEnd(), CLOCK.getZone()));
+               windowedWord.setTime(outputWindowedWord.getTime());
                windowedWord.setKey(outputWindowedWord.getKey());
 
                return windowedWord;
index 470f727..ab7cc59 100644 (file)
@@ -4,15 +4,12 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
-import java.time.Instant;
-
 
 @Data
 @NoArgsConstructor
 @AllArgsConstructor(staticName = "of")
 public class OutputWindowedWord
 {
-  Instant start;
-  Instant end;
+  String time;
   String key;
 }