788ae202f0cc31f783149f20ee86a41ae2340e35
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / popular / PopularStreamProcessor.java
1 package de.juplo.kafka.wordcount.popular;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.common.serialization.Serdes;
5 import org.apache.kafka.streams.*;
6 import org.apache.kafka.streams.kstream.Consumed;
7 import org.apache.kafka.streams.kstream.Materialized;
8 import org.apache.kafka.streams.kstream.Produced;
9 import org.apache.kafka.streams.kstream.TimeWindows;
10 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
11 import org.apache.kafka.streams.state.QueryableStoreTypes;
12 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
13 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
14 import org.springframework.kafka.support.serializer.JsonSerde;
15 import org.springframework.kafka.support.serializer.JsonSerializer;
16
17 import java.time.Duration;
18 import java.time.ZoneId;
19 import java.time.ZonedDateTime;
20 import java.util.Map;
21 import java.util.Properties;
22 import java.util.stream.Collectors;
23
24
25 @Slf4j
26 public class PopularStreamProcessor
27 {
28         public static final String KEY_VALUE_STORE_NAME = "popular";
29         public static final String WINDOW_STORE_NAME = "popular-windows";
30         public static final Duration WINDOW_SIZE = Duration.ofSeconds(30);
31
32
33         public final KafkaStreams streams;
34
35
36         public PopularStreamProcessor(
37                         String inputTopic,
38                         String outputTopic,
39                         Properties properties,
40                         ZoneId zone,
41                         WindowBytesStoreSupplier windowBytesStoreSupplier,
42                         KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
43         {
44                 Topology topology = PopularStreamProcessor.buildTopology(
45                                 inputTopic,
46                                 outputTopic,
47                                 zone,
48                                 windowBytesStoreSupplier,
49                                 keyValueBytesStoreSupplier);
50
51                 streams = new KafkaStreams(topology, properties);
52         }
53
54         static Topology buildTopology(
55                         String inputTopic,
56                         String outputTopic,
57                         ZoneId zone,
58                         WindowBytesStoreSupplier windowBytesStoreSupplier,
59                         KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
60         {
61                 StreamsBuilder builder = new StreamsBuilder();
62
63                 builder
64                                 .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
65                                 .peek((user, userWord) -> log.info("{}: {} -> {}", inputTopic, user, userWord))
66                                 .map((key, userWord) -> new KeyValue<>(Word.of(userWord.getWord()), Word.of(userWord.getWord())))
67                                 .peek((key, value) -> log.info("mapped: {} -> {}", key, value))
68                                 .groupByKey()
69                                 .windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE))
70                                 .count(
71                                                 Materialized
72                                                                 .<Word, Long>as(windowBytesStoreSupplier)
73                                                                 .withKeySerde(new JsonSerde<>(Word.class).noTypeInfo())
74                                                                 .withValueSerde(Serdes.Long()))
75                                 .toStream()
76                                 .peek((windowedWord, counter) -> log.info("windowed: {} -> {}", windowedWord, counter))
77                                 .map((windowedWord, counter) -> new KeyValue<>(
78                                                 WindowedWord.of(
79                                                                 ZonedDateTime.ofInstant(windowedWord.window().startTime(), zone),
80                                                                 ZonedDateTime.ofInstant(windowedWord.window().endTime(), zone),
81                                                                 windowedWord.key().getWord()),
82                                                 WordCounter.of(windowedWord.key().getWord(), counter)))
83                                 .peek((windowedWord, wordCounter) -> log.info("results: {} -> {}", windowedWord, wordCounter))
84                                 .toTable(
85                                                 Materialized
86                                                                 .<WindowedWord, WordCounter>as(keyValueBytesStoreSupplier)
87                                                                 .withKeySerde(new JsonSerde<>(WindowedWord.class).noTypeInfo())
88                                                                 .withValueSerde(new JsonSerde<>(WordCounter.class).noTypeInfo()))
89                                 .toStream()
90                                 .peek((windowedWord, wordCounter) -> log.info("output: {} -> {}", windowedWord, wordCounter))
91                                 .to(outputTopic, Produced.with(outKeySerde(), outValueSerde()));
92
93                 Topology topology = builder.build();
94                 log.info("\n\n{}", topology.describe());
95
96                 return topology;
97         }
98
99         ReadOnlyKeyValueStore<WindowedWord, WordCounter> getStore()
100         {
101                 return streams.store(StoreQueryParameters.fromNameAndType(KEY_VALUE_STORE_NAME, QueryableStoreTypes.keyValueStore()));
102         }
103
104         public void start()
105         {
106                 log.info("Starting Stream-Processor");
107                 streams.start();
108         }
109
110         public void stop()
111         {
112                 log.info("Stopping Stream-Processor");
113                 streams.close();
114         }
115
116
117
118         public static JsonSerde<User> inKeySerde()
119         {
120                 return new JsonSerde<>(User.class);
121         }
122
123         public static JsonSerde<Word> inValueSerde()
124         {
125                 return new JsonSerde<>(Word.class);
126         }
127
128         public static JsonSerde<WindowedWord> outKeySerde()
129         {
130                 return serde(true);
131         }
132
133         public static JsonSerde<WordCounter> outValueSerde()
134         {
135                 return serde(false);
136         }
137
138         public static <T> JsonSerde<T> serde(boolean isKey)
139         {
140                 JsonSerde<T> serde = new JsonSerde<>();
141                 serde.configure(
142                                 Map.of(JsonSerializer.TYPE_MAPPINGS, typeMappingsConfig()),
143                                 isKey);
144                 return serde;
145         }
146
147         private static String typeMappingsConfig()
148         {
149                 return typeMappingsConfig(WindowedWord.class, WordCounter.class);
150         }
151
152         public static String typeMappingsConfig(Class wordClass, Class wordCounterClass)
153         {
154                 return Map.of(
155                                                 "word", wordClass,
156                                                 "counter", wordCounterClass)
157                                 .entrySet()
158                                 .stream()
159                                 .map(entry -> entry.getKey() + ":" + entry.getValue().getName())
160                                 .collect(Collectors.joining(","));
161         }
162 }