popular: 1.2.0 - Refined `WindowedWord` (timestamp as string)
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / popular / PopularStreamProcessor.java
index 7bba240..883e124 100644 (file)
@@ -1,16 +1,17 @@
 package de.juplo.kafka.wordcount.popular;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.*;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.springframework.kafka.support.serializer.JsonSerde;
 import org.springframework.kafka.support.serializer.JsonSerializer;
 
+import java.time.Duration;
 import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
@@ -19,7 +20,9 @@ import java.util.stream.Collectors;
 @Slf4j
 public class PopularStreamProcessor
 {
-       public static final String STORE_NAME = "popular";
+       public static final String KEY_VALUE_STORE_NAME = "popular";
+       public static final String WINDOW_STORE_NAME = "popular-windows";
+       public static final Duration WINDOW_SIZE = Duration.ofSeconds(30);
 
 
        public final KafkaStreams streams;
@@ -29,12 +32,14 @@ public class PopularStreamProcessor
                        String inputTopic,
                        String outputTopic,
                        Properties properties,
-                       KeyValueBytesStoreSupplier storeSupplier)
+                       WindowBytesStoreSupplier windowBytesStoreSupplier,
+                       KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
        {
                Topology topology = PopularStreamProcessor.buildTopology(
                                inputTopic,
                                outputTopic,
-                               storeSupplier);
+                               windowBytesStoreSupplier,
+                               keyValueBytesStoreSupplier);
 
                streams = new KafkaStreams(topology, properties);
        }
@@ -42,20 +47,39 @@ public class PopularStreamProcessor
        static Topology buildTopology(
                        String inputTopic,
                        String outputTopic,
-                       KeyValueBytesStoreSupplier storeSupplier)
+                       WindowBytesStoreSupplier windowBytesStoreSupplier,
+                       KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
        {
                StreamsBuilder builder = new StreamsBuilder();
 
                builder
                                .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
-                               .map((key, word) -> new KeyValue<>(word, word))
+                               .peek((user, userWord) -> log.info("{}: {} -> {}", inputTopic, user, userWord))
+                               .map((key, userWord) -> new KeyValue<>(Word.of(userWord.getWord()), Word.of(userWord.getWord())))
+                               .peek((key, value) -> log.info("mapped: {} -> {}", key, value))
                                .groupByKey()
+                               .windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE))
                                .count(
                                                Materialized
-                                                               .<Word, Long>as(storeSupplier)
-                                                               .withKeySerde(new JsonSerde<>(Word.class))) // No headers are present: fixed typing is needed!
+                                                               .<Word, Long>as(windowBytesStoreSupplier)
+                                                               .withKeySerde(new JsonSerde<>(Word.class).noTypeInfo())
+                                                               .withValueSerde(Serdes.Long()))
+                               .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
                                .toStream()
-                               .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter)))
+                               .peek((windowedWord, counter) -> log.info("windowed: {} -> {}", windowedWord, counter))
+                               .map((windowedWord, counter) -> new KeyValue<>(
+                                               WindowedWord.of(
+                                                               Long.toString(windowedWord.window().startTime().getEpochSecond()),
+                                                               windowedWord.key().getWord()),
+                                               WordCounter.of(windowedWord.key().getWord(), counter)))
+                               .peek((windowedWord, wordCounter) -> log.info("results: {} -> {}", windowedWord, wordCounter))
+                               .toTable(
+                                               Materialized
+                                                               .<WindowedWord, WordCounter>as(keyValueBytesStoreSupplier)
+                                                               .withKeySerde(new JsonSerde<>(WindowedWord.class).noTypeInfo())
+                                                               .withValueSerde(new JsonSerde<>(WordCounter.class).noTypeInfo()))
+                               .toStream()
+                               .peek((windowedWord, wordCounter) -> log.info("output: {} -> {}", windowedWord, wordCounter))
                                .to(outputTopic, Produced.with(outKeySerde(), outValueSerde()));
 
                Topology topology = builder.build();
@@ -64,9 +88,9 @@ public class PopularStreamProcessor
                return topology;
        }
 
-       ReadOnlyKeyValueStore<Word, Long> getStore()
+       ReadOnlyKeyValueStore<WindowedWord, WordCounter> getStore()
        {
-               return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
+               return streams.store(StoreQueryParameters.fromNameAndType(KEY_VALUE_STORE_NAME, QueryableStoreTypes.keyValueStore()));
        }
 
        public void start()
@@ -93,7 +117,7 @@ public class PopularStreamProcessor
                return new JsonSerde<>(Word.class);
        }
 
-       public static JsonSerde<Word> outKeySerde()
+       public static JsonSerde<WindowedWord> outKeySerde()
        {
                return serde(true);
        }
@@ -114,7 +138,7 @@ public class PopularStreamProcessor
 
        private static String typeMappingsConfig()
        {
-               return typeMappingsConfig(Word.class, WordCounter.class);
+               return typeMappingsConfig(WindowedWord.class, WordCounter.class);
        }
 
        public static String typeMappingsConfig(Class wordClass, Class wordCounterClass)