From: Kai Moritz Date: Sat, 22 Jun 2024 09:23:25 +0000 (+0200) Subject: popular: 1.2.0 - Refined `WindowedWord` (timestamp as string) X-Git-Tag: popular-1.2.0 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=dc7be5b9ac6d8b441039f8503b61652e74f4faee;p=demos%2Fkafka%2Fwordcount popular: 1.2.0 - Refined `WindowedWord` (timestamp as string) * `WindowedWord` only stores the start-time of the window. * `WindowedWord` stores the seconds-since-epoce as string. --- diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java index 45a0a57..2cbe07a 100644 --- a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java @@ -14,7 +14,6 @@ import org.springframework.context.annotation.Configuration; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerde; -import java.time.Duration; import java.time.ZoneId; import java.util.Properties; import java.util.concurrent.CompletableFuture; @@ -62,7 +61,6 @@ public class PopularApplicationConfiguriation public PopularStreamProcessor streamProcessor( PopularApplicationProperties applicationProperties, Properties streamProcessorProperties, - ZoneId zone, KeyValueBytesStoreSupplier keyValueBytesStoreSupplier, WindowBytesStoreSupplier windowBytesStoreSupplier, ConfigurableApplicationContext context) @@ -71,7 +69,6 @@ public class PopularApplicationConfiguriation applicationProperties.getInputTopic(), applicationProperties.getOutputTopic(), streamProcessorProperties, - zone, windowBytesStoreSupplier, keyValueBytesStoreSupplier); @@ -90,12 +87,6 @@ public class PopularApplicationConfiguriation return streamProcessor; } - @Bean - public ZoneId defaultZone() - { - return ZoneId.systemDefault(); - } - @Bean public WindowBytesStoreSupplier windowBytesStoreSupplier() { diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java index 9bcf0f7..883e124 100644 --- a/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java @@ -12,8 +12,6 @@ import org.springframework.kafka.support.serializer.JsonSerde; import org.springframework.kafka.support.serializer.JsonSerializer; import java.time.Duration; -import java.time.ZoneId; -import java.time.ZonedDateTime; import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; @@ -34,14 +32,12 @@ public class PopularStreamProcessor String inputTopic, String outputTopic, Properties properties, - ZoneId zone, WindowBytesStoreSupplier windowBytesStoreSupplier, KeyValueBytesStoreSupplier keyValueBytesStoreSupplier) { Topology topology = PopularStreamProcessor.buildTopology( inputTopic, outputTopic, - zone, windowBytesStoreSupplier, keyValueBytesStoreSupplier); @@ -51,7 +47,6 @@ public class PopularStreamProcessor static Topology buildTopology( String inputTopic, String outputTopic, - ZoneId zone, WindowBytesStoreSupplier windowBytesStoreSupplier, KeyValueBytesStoreSupplier keyValueBytesStoreSupplier) { @@ -74,8 +69,7 @@ public class PopularStreamProcessor .peek((windowedWord, counter) -> log.info("windowed: {} -> {}", windowedWord, counter)) .map((windowedWord, counter) -> new KeyValue<>( WindowedWord.of( - ZonedDateTime.ofInstant(windowedWord.window().startTime(), zone), - ZonedDateTime.ofInstant(windowedWord.window().endTime(), zone), + Long.toString(windowedWord.window().startTime().getEpochSecond()), windowedWord.key().getWord()), WordCounter.of(windowedWord.key().getWord(), counter))) .peek((windowedWord, wordCounter) -> log.info("results: {} -> {}", windowedWord, wordCounter)) diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/WindowedWord.java b/src/main/java/de/juplo/kafka/wordcount/popular/WindowedWord.java index b759f2e..b35ede6 100644 --- a/src/main/java/de/juplo/kafka/wordcount/popular/WindowedWord.java +++ b/src/main/java/de/juplo/kafka/wordcount/popular/WindowedWord.java @@ -4,15 +4,12 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; -import java.time.ZonedDateTime; - @Data @NoArgsConstructor @AllArgsConstructor(staticName = "of") public class WindowedWord { - ZonedDateTime start; - ZonedDateTime end; + String time; String key; } diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java index 23ab524..aa99b8e 100644 --- a/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java @@ -20,8 +20,6 @@ import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; -import java.time.Duration; -import java.time.ZoneId; import java.util.Map; import static de.juplo.kafka.wordcount.popular.PopularApplicationConfiguriation.serializationConfig; @@ -33,7 +31,6 @@ public class PopularStreamProcessorTopologyTest { public static final String IN = "TEST-IN"; public static final String OUT = "TEST-OUT"; - public static final ZoneId ZONE = ZoneId.of("Europe/Berlin"); static TopologyTestDriver testDriver; @@ -46,7 +43,6 @@ public class PopularStreamProcessorTopologyTest Topology topology = PopularStreamProcessor.buildTopology( IN, OUT, - ZONE, Stores.inMemoryWindowStore( WINDOW_STORE_NAME, WINDOW_SIZE.multipliedBy(2), diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java b/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java index 0d3036f..6d31575 100644 --- a/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java @@ -35,20 +35,20 @@ class TestData static final String WORD_S = "s"; static final String WORD_BOÄH = "Boäh"; - static final OutputWindowedWord WINDOWED_WORD_HALLO = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_HALLO); - static final OutputWindowedWord WINDOWED_WORD_WELT = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_WELT); - static final OutputWindowedWord WINDOWED_WORD_BOÄH = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_BOÄH); - static final OutputWindowedWord WINDOWED_WORD_MÜSCH = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_MÜSCH); - static final OutputWindowedWord WINDOWED_WORD_S = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_S); + static final OutputWindowedWord WINDOWED_WORD_HALLO = OutputWindowedWord.of(windowStart(), WORD_HALLO); + static final OutputWindowedWord WINDOWED_WORD_WELT = OutputWindowedWord.of(windowStart(), WORD_WELT); + static final OutputWindowedWord WINDOWED_WORD_BOÄH = OutputWindowedWord.of(windowStart(), WORD_BOÄH); + static final OutputWindowedWord WINDOWED_WORD_MÜSCH = OutputWindowedWord.of(windowStart(), WORD_MÜSCH); + static final OutputWindowedWord WINDOWED_WORD_S = OutputWindowedWord.of(windowStart(), WORD_S); - private static Instant windowStart() + private static String windowStart() { - return windowBoundFor(0); + return toEpochSecond(windowBoundFor(0)); } - private static Instant windowEnd() + private static String toEpochSecond(Instant instant) { - return windowBoundFor(WINDOW_SIZE.toSecondsPart()); + return Long.toString(instant.getEpochSecond()); } private static Instant windowBoundFor(int second) @@ -190,8 +190,7 @@ class TestData { WindowedWord windowedWord = new WindowedWord(); - windowedWord.setStart(ZonedDateTime.ofInstant(outputWindowedWord.getStart(), CLOCK.getZone())); - windowedWord.setEnd(ZonedDateTime.ofInstant(outputWindowedWord.getEnd(), CLOCK.getZone())); + windowedWord.setTime(outputWindowedWord.getTime()); windowedWord.setKey(outputWindowedWord.getKey()); return windowedWord; diff --git a/src/test/java/de/juplo/kafka/wordcount/stats/OutputWindowedWord.java b/src/test/java/de/juplo/kafka/wordcount/stats/OutputWindowedWord.java index 470f727..ab7cc59 100644 --- a/src/test/java/de/juplo/kafka/wordcount/stats/OutputWindowedWord.java +++ b/src/test/java/de/juplo/kafka/wordcount/stats/OutputWindowedWord.java @@ -4,15 +4,12 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; -import java.time.Instant; - @Data @NoArgsConstructor @AllArgsConstructor(staticName = "of") public class OutputWindowedWord { - Instant start; - Instant end; + String time; String key; }