popular: 1.2.0 - Refined `WindowedWord` (timestamp as string)
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / popular / PopularStreamProcessor.java
1 package de.juplo.kafka.wordcount.popular;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.common.serialization.Serdes;
5 import org.apache.kafka.streams.*;
6 import org.apache.kafka.streams.kstream.*;
7 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
8 import org.apache.kafka.streams.state.QueryableStoreTypes;
9 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
10 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
11 import org.springframework.kafka.support.serializer.JsonSerde;
12 import org.springframework.kafka.support.serializer.JsonSerializer;
13
14 import java.time.Duration;
15 import java.util.Map;
16 import java.util.Properties;
17 import java.util.stream.Collectors;
18
19
20 @Slf4j
21 public class PopularStreamProcessor
22 {
23         public static final String KEY_VALUE_STORE_NAME = "popular";
24         public static final String WINDOW_STORE_NAME = "popular-windows";
25         public static final Duration WINDOW_SIZE = Duration.ofSeconds(30);
26
27
28         public final KafkaStreams streams;
29
30
31         public PopularStreamProcessor(
32                         String inputTopic,
33                         String outputTopic,
34                         Properties properties,
35                         WindowBytesStoreSupplier windowBytesStoreSupplier,
36                         KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
37         {
38                 Topology topology = PopularStreamProcessor.buildTopology(
39                                 inputTopic,
40                                 outputTopic,
41                                 windowBytesStoreSupplier,
42                                 keyValueBytesStoreSupplier);
43
44                 streams = new KafkaStreams(topology, properties);
45         }
46
47         static Topology buildTopology(
48                         String inputTopic,
49                         String outputTopic,
50                         WindowBytesStoreSupplier windowBytesStoreSupplier,
51                         KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
52         {
53                 StreamsBuilder builder = new StreamsBuilder();
54
55                 builder
56                                 .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
57                                 .peek((user, userWord) -> log.info("{}: {} -> {}", inputTopic, user, userWord))
58                                 .map((key, userWord) -> new KeyValue<>(Word.of(userWord.getWord()), Word.of(userWord.getWord())))
59                                 .peek((key, value) -> log.info("mapped: {} -> {}", key, value))
60                                 .groupByKey()
61                                 .windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE))
62                                 .count(
63                                                 Materialized
64                                                                 .<Word, Long>as(windowBytesStoreSupplier)
65                                                                 .withKeySerde(new JsonSerde<>(Word.class).noTypeInfo())
66                                                                 .withValueSerde(Serdes.Long()))
67                                 .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
68                                 .toStream()
69                                 .peek((windowedWord, counter) -> log.info("windowed: {} -> {}", windowedWord, counter))
70                                 .map((windowedWord, counter) -> new KeyValue<>(
71                                                 WindowedWord.of(
72                                                                 Long.toString(windowedWord.window().startTime().getEpochSecond()),
73                                                                 windowedWord.key().getWord()),
74                                                 WordCounter.of(windowedWord.key().getWord(), counter)))
75                                 .peek((windowedWord, wordCounter) -> log.info("results: {} -> {}", windowedWord, wordCounter))
76                                 .toTable(
77                                                 Materialized
78                                                                 .<WindowedWord, WordCounter>as(keyValueBytesStoreSupplier)
79                                                                 .withKeySerde(new JsonSerde<>(WindowedWord.class).noTypeInfo())
80                                                                 .withValueSerde(new JsonSerde<>(WordCounter.class).noTypeInfo()))
81                                 .toStream()
82                                 .peek((windowedWord, wordCounter) -> log.info("output: {} -> {}", windowedWord, wordCounter))
83                                 .to(outputTopic, Produced.with(outKeySerde(), outValueSerde()));
84
85                 Topology topology = builder.build();
86                 log.info("\n\n{}", topology.describe());
87
88                 return topology;
89         }
90
91         ReadOnlyKeyValueStore<WindowedWord, WordCounter> getStore()
92         {
93                 return streams.store(StoreQueryParameters.fromNameAndType(KEY_VALUE_STORE_NAME, QueryableStoreTypes.keyValueStore()));
94         }
95
96         public void start()
97         {
98                 log.info("Starting Stream-Processor");
99                 streams.start();
100         }
101
102         public void stop()
103         {
104                 log.info("Stopping Stream-Processor");
105                 streams.close();
106         }
107
108
109
110         public static JsonSerde<User> inKeySerde()
111         {
112                 return new JsonSerde<>(User.class);
113         }
114
115         public static JsonSerde<Word> inValueSerde()
116         {
117                 return new JsonSerde<>(Word.class);
118         }
119
120         public static JsonSerde<WindowedWord> outKeySerde()
121         {
122                 return serde(true);
123         }
124
125         public static JsonSerde<WordCounter> outValueSerde()
126         {
127                 return serde(false);
128         }
129
130         public static <T> JsonSerde<T> serde(boolean isKey)
131         {
132                 JsonSerde<T> serde = new JsonSerde<>();
133                 serde.configure(
134                                 Map.of(JsonSerializer.TYPE_MAPPINGS, typeMappingsConfig()),
135                                 isKey);
136                 return serde;
137         }
138
139         private static String typeMappingsConfig()
140         {
141                 return typeMappingsConfig(WindowedWord.class, WordCounter.class);
142         }
143
144         public static String typeMappingsConfig(Class wordClass, Class wordCounterClass)
145         {
146                 return Map.of(
147                                                 "word", wordClass,
148                                                 "counter", wordCounterClass)
149                                 .entrySet()
150                                 .stream()
151                                 .map(entry -> entry.getKey() + ":" + entry.getValue().getName())
152                                 .collect(Collectors.joining(","));
153         }
154 }