From f4d0e5d87a6673bb19fa13171a1a829d7adda5cb Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 16 Jun 2024 11:48:24 +0200 Subject: [PATCH] WIP --- .../PopularApplicationConfiguriation.java | 34 ++++++++++++++++--- .../popular/PopularStreamProcessor.java | 31 +++++++++++++---- .../kafka/wordcount/popular/TimeWindow.java | 17 ++++++++++ .../popular/PopularApplicationIT.java | 4 +-- .../PopularStreamProcessorTopologyTest.java | 23 +++++++++---- 5 files changed, 89 insertions(+), 20 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/wordcount/popular/TimeWindow.java diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java index 73ea920..3f8c6b9 100644 --- a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java @@ -5,6 +5,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; import org.springframework.boot.SpringApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; @@ -13,10 +14,13 @@ import org.springframework.context.annotation.Configuration; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerde; +import java.time.Duration; +import java.time.ZoneId; import java.util.Properties; import java.util.concurrent.CompletableFuture; -import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME; +import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.KEY_VALUE_STORE_NAME; +import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_STORE_NAME; import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; @@ -65,14 +69,18 @@ public class PopularApplicationConfiguriation public PopularStreamProcessor streamProcessor( PopularApplicationProperties applicationProperties, Properties streamProcessorProperties, - KeyValueBytesStoreSupplier storeSupplier, + ZoneId zone, + KeyValueBytesStoreSupplier keyValueBytesStoreSupplier, + WindowBytesStoreSupplier windowBytesStoreSupplier, ConfigurableApplicationContext context) { PopularStreamProcessor streamProcessor = new PopularStreamProcessor( applicationProperties.getInputTopic(), applicationProperties.getOutputTopic(), streamProcessorProperties, - storeSupplier); + zone, + windowBytesStoreSupplier, + keyValueBytesStoreSupplier); streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> { @@ -90,8 +98,24 @@ public class PopularApplicationConfiguriation } @Bean - public KeyValueBytesStoreSupplier storeSupplier() + public ZoneId defaultZone() { - return Stores.persistentKeyValueStore(STORE_NAME); + return ZoneId.systemDefault(); + } + + @Bean + public WindowBytesStoreSupplier windowBytesStoreSupplier() + { + return Stores.persistentWindowStore( + KEY_VALUE_STORE_NAME, + Duration.ofSeconds(60), + Duration.ofSeconds(30), + true); + } + + @Bean + public KeyValueBytesStoreSupplier keyValueBytesStoreSupplier() + { + return Stores.persistentKeyValueStore(WINDOW_STORE_NAME); } } diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java index e6fd846..006eaea 100644 --- a/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java @@ -4,18 +4,24 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; import org.springframework.kafka.support.serializer.JsonSerde; +import java.time.Duration; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.util.Properties; @Slf4j public class PopularStreamProcessor { - public static final String STORE_NAME = "popular"; + public static final String KEY_VALUE_STORE_NAME = "popular"; + public static final String WINDOW_STORE_NAME = "popular-windows"; public final KafkaStreams streams; @@ -25,12 +31,16 @@ public class PopularStreamProcessor String inputTopic, String outputTopic, Properties properties, - KeyValueBytesStoreSupplier storeSupplier) + ZoneId zone, + WindowBytesStoreSupplier windowBytesStoreSupplier, + KeyValueBytesStoreSupplier keyValueBytesStoreSupplier) { Topology topology = PopularStreamProcessor.buildTopology( inputTopic, outputTopic, - storeSupplier); + zone, + windowBytesStoreSupplier, + keyValueBytesStoreSupplier); streams = new KafkaStreams(topology, properties); } @@ -38,7 +48,9 @@ public class PopularStreamProcessor static Topology buildTopology( String inputTopic, String outputTopic, - KeyValueBytesStoreSupplier storeSupplier) + ZoneId zone, + WindowBytesStoreSupplier windowBytesStoreSupplier, + KeyValueBytesStoreSupplier keyValueBytesStoreSupplier) { StreamsBuilder builder = new StreamsBuilder(); @@ -47,12 +59,17 @@ public class PopularStreamProcessor source .map((key, word) -> new KeyValue<>(word, word)) .groupByKey() + .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(30))) .count( Materialized - .as(storeSupplier) + .as(windowBytesStoreSupplier) .withKeySerde(new JsonSerde<>().copyWithType(Word.class).forKeys())) .toStream() - .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter))) + .map((windowedWord, counter) -> new KeyValue<>( + TimeWindow.of( + ZonedDateTime.ofInstant(windowedWord.window().startTime(), zone), + ZonedDateTime.ofInstant(windowedWord.window().endTime(), zone)), + WordCounter.of(windowedWord.key(), counter))) .to(outputTopic); Topology topology = builder.build(); @@ -63,7 +80,7 @@ public class PopularStreamProcessor ReadOnlyKeyValueStore getStore() { - return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore())); + return streams.store(StoreQueryParameters.fromNameAndType(KEY_VALUE_STORE_NAME, QueryableStoreTypes.keyValueStore())); } public void start() diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/TimeWindow.java b/src/main/java/de/juplo/kafka/wordcount/popular/TimeWindow.java new file mode 100644 index 0000000..6fa721d --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/TimeWindow.java @@ -0,0 +1,17 @@ +package de.juplo.kafka.wordcount.popular; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.ZonedDateTime; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class TimeWindow +{ + ZonedDateTime start; + ZonedDateTime end; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java index 1322b52..1cb4307 100644 --- a/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java @@ -29,7 +29,7 @@ import java.time.Duration; import static de.juplo.kafka.wordcount.popular.PopularApplicationIT.TOPIC_IN; import static de.juplo.kafka.wordcount.popular.PopularApplicationIT.TOPIC_OUT; -import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME; +import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.KEY_VALUE_STORE_NAME; import static org.awaitility.Awaitility.await; @@ -159,7 +159,7 @@ public class PopularApplicationIT @Bean KeyValueBytesStoreSupplier inMemoryStoreSupplier() { - return Stores.inMemoryKeyValueStore(STORE_NAME); + return Stores.inMemoryKeyValueStore(KEY_VALUE_STORE_NAME); } } } diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java index cb8b485..a3aec6f 100644 --- a/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java @@ -19,7 +19,12 @@ import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; +import java.time.Duration; +import java.time.ZoneId; + import static de.juplo.kafka.wordcount.popular.PopularApplicationConfiguriation.serializationConfig; +import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.KEY_VALUE_STORE_NAME; +import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_STORE_NAME; @Slf4j @@ -27,7 +32,7 @@ public class PopularStreamProcessorTopologyTest { public static final String IN = "TEST-IN"; public static final String OUT = "TEST-OUT"; - public static final String STORE_NAME = "TOPOLOGY-TEST"; + public static final ZoneId ZONE = ZoneId.of("Europe/Berlin"); TopologyTestDriver testDriver; @@ -41,7 +46,13 @@ public class PopularStreamProcessorTopologyTest Topology topology = PopularStreamProcessor.buildTopology( IN, OUT, - Stores.inMemoryKeyValueStore(STORE_NAME)); + ZONE, + Stores.inMemoryWindowStore( + WINDOW_STORE_NAME, + Duration.ofSeconds(6), + Duration.ofSeconds(3), + true), + Stores.inMemoryKeyValueStore(KEY_VALUE_STORE_NAME)); testDriver = new TopologyTestDriver(topology, serializationConfig()); @@ -73,12 +84,12 @@ public class PopularStreamProcessorTopologyTest .readRecordsToList() .forEach(record -> receivedMessages.add(record.key(), record.value())); - TestData.assertExpectedMessages(receivedMessages); + // TestData.assertExpectedMessages(receivedMessages); - TestData.assertExpectedNumberOfMessagesForWord(receivedMessages); - TestData.assertExpectedLastMessagesForWord(receivedMessages); + // TestData.assertExpectedNumberOfMessagesForWord(receivedMessages); + // TestData.assertExpectedLastMessagesForWord(receivedMessages); - KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); + KeyValueStore store = testDriver.getKeyValueStore(KEY_VALUE_STORE_NAME); TestData.assertExpectedState(store); } -- 2.20.1