popular: 1.0.0 - Word are counted for hopping time-windows
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / popular / PopularStreamProcessor.java
index 7bba240..9ba9ad2 100644 (file)
@@ -1,16 +1,22 @@
 package de.juplo.kafka.wordcount.popular;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.*;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.springframework.kafka.support.serializer.JsonSerde;
 import org.springframework.kafka.support.serializer.JsonSerializer;
 
+import java.time.Duration;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
 import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
@@ -19,7 +25,9 @@ import java.util.stream.Collectors;
 @Slf4j
 public class PopularStreamProcessor
 {
-       public static final String STORE_NAME = "popular";
+       public static final String KEY_VALUE_STORE_NAME = "popular";
+       public static final String WINDOW_STORE_NAME = "popular-windows";
+       public static final Duration WINDOW_SIZE = Duration.ofSeconds(30);
 
 
        public final KafkaStreams streams;
@@ -29,12 +37,16 @@ public class PopularStreamProcessor
                        String inputTopic,
                        String outputTopic,
                        Properties properties,
-                       KeyValueBytesStoreSupplier storeSupplier)
+                       ZoneId zone,
+                       WindowBytesStoreSupplier windowBytesStoreSupplier,
+                       KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
        {
                Topology topology = PopularStreamProcessor.buildTopology(
                                inputTopic,
                                outputTopic,
-                               storeSupplier);
+                               zone,
+                               windowBytesStoreSupplier,
+                               keyValueBytesStoreSupplier);
 
                streams = new KafkaStreams(topology, properties);
        }
@@ -42,20 +54,35 @@ public class PopularStreamProcessor
        static Topology buildTopology(
                        String inputTopic,
                        String outputTopic,
-                       KeyValueBytesStoreSupplier storeSupplier)
+                       ZoneId zone,
+                       WindowBytesStoreSupplier windowBytesStoreSupplier,
+                       KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
        {
                StreamsBuilder builder = new StreamsBuilder();
 
                builder
                                .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
-                               .map((key, word) -> new KeyValue<>(word, word))
+                               .map((key, userWord) -> new KeyValue<>(Word.of(userWord.getWord()), Word.of(userWord.getWord())))
                                .groupByKey()
+                               .windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE))
                                .count(
                                                Materialized
-                                                               .<Word, Long>as(storeSupplier)
-                                                               .withKeySerde(new JsonSerde<>(Word.class))) // No headers are present: fixed typing is needed!
+                                                               .<Word, Long>as(windowBytesStoreSupplier)
+                                                               .withKeySerde(new JsonSerde<>(Word.class).noTypeInfo())
+                                                               .withValueSerde(Serdes.Long()))
+                               .toStream()
+                               .map((windowedWord, counter) -> new KeyValue<>(
+                                               WindowedWord.of(
+                                                               ZonedDateTime.ofInstant(windowedWord.window().startTime(), zone),
+                                                               ZonedDateTime.ofInstant(windowedWord.window().endTime(), zone),
+                                                               windowedWord.key().getWord()),
+                                               WordCounter.of(windowedWord.key().getWord(), counter)))
+                               .toTable(
+                                               Materialized
+                                                               .<WindowedWord, WordCounter>as(keyValueBytesStoreSupplier)
+                                                               .withKeySerde(new JsonSerde<>(WindowedWord.class).noTypeInfo())
+                                                               .withValueSerde(new JsonSerde<>(WordCounter.class).noTypeInfo()))
                                .toStream()
-                               .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter)))
                                .to(outputTopic, Produced.with(outKeySerde(), outValueSerde()));
 
                Topology topology = builder.build();
@@ -64,9 +91,9 @@ public class PopularStreamProcessor
                return topology;
        }
 
-       ReadOnlyKeyValueStore<Word, Long> getStore()
+       ReadOnlyKeyValueStore<WindowedWord, WordCounter> getStore()
        {
-               return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
+               return streams.store(StoreQueryParameters.fromNameAndType(KEY_VALUE_STORE_NAME, QueryableStoreTypes.keyValueStore()));
        }
 
        public void start()
@@ -93,7 +120,7 @@ public class PopularStreamProcessor
                return new JsonSerde<>(Word.class);
        }
 
-       public static JsonSerde<Word> outKeySerde()
+       public static JsonSerde<WindowedWord> outKeySerde()
        {
                return serde(true);
        }
@@ -114,7 +141,7 @@ public class PopularStreamProcessor
 
        private static String typeMappingsConfig()
        {
-               return typeMappingsConfig(Word.class, WordCounter.class);
+               return typeMappingsConfig(WindowedWord.class, WordCounter.class);
        }
 
        public static String typeMappingsConfig(Class wordClass, Class wordCounterClass)