popular: 1.1.0 - Implemented supression of intermediate results
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / popular / PopularStreamProcessor.java
1 package de.juplo.kafka.wordcount.popular;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.common.serialization.Serdes;
5 import org.apache.kafka.streams.*;
6 import org.apache.kafka.streams.kstream.*;
7 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
8 import org.apache.kafka.streams.state.QueryableStoreTypes;
9 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
10 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
11 import org.springframework.kafka.support.serializer.JsonSerde;
12 import org.springframework.kafka.support.serializer.JsonSerializer;
13
14 import java.time.Duration;
15 import java.time.ZoneId;
16 import java.time.ZonedDateTime;
17 import java.util.Map;
18 import java.util.Properties;
19 import java.util.stream.Collectors;
20
21
22 @Slf4j
23 public class PopularStreamProcessor
24 {
25         public static final String KEY_VALUE_STORE_NAME = "popular";
26         public static final String WINDOW_STORE_NAME = "popular-windows";
27         public static final Duration WINDOW_SIZE = Duration.ofSeconds(30);
28
29
30         public final KafkaStreams streams;
31
32
33         public PopularStreamProcessor(
34                         String inputTopic,
35                         String outputTopic,
36                         Properties properties,
37                         ZoneId zone,
38                         WindowBytesStoreSupplier windowBytesStoreSupplier,
39                         KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
40         {
41                 Topology topology = PopularStreamProcessor.buildTopology(
42                                 inputTopic,
43                                 outputTopic,
44                                 zone,
45                                 windowBytesStoreSupplier,
46                                 keyValueBytesStoreSupplier);
47
48                 streams = new KafkaStreams(topology, properties);
49         }
50
51         static Topology buildTopology(
52                         String inputTopic,
53                         String outputTopic,
54                         ZoneId zone,
55                         WindowBytesStoreSupplier windowBytesStoreSupplier,
56                         KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
57         {
58                 StreamsBuilder builder = new StreamsBuilder();
59
60                 builder
61                                 .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
62                                 .peek((user, userWord) -> log.info("{}: {} -> {}", inputTopic, user, userWord))
63                                 .map((key, userWord) -> new KeyValue<>(Word.of(userWord.getWord()), Word.of(userWord.getWord())))
64                                 .peek((key, value) -> log.info("mapped: {} -> {}", key, value))
65                                 .groupByKey()
66                                 .windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE))
67                                 .count(
68                                                 Materialized
69                                                                 .<Word, Long>as(windowBytesStoreSupplier)
70                                                                 .withKeySerde(new JsonSerde<>(Word.class).noTypeInfo())
71                                                                 .withValueSerde(Serdes.Long()))
72                                 .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
73                                 .toStream()
74                                 .peek((windowedWord, counter) -> log.info("windowed: {} -> {}", windowedWord, counter))
75                                 .map((windowedWord, counter) -> new KeyValue<>(
76                                                 WindowedWord.of(
77                                                                 ZonedDateTime.ofInstant(windowedWord.window().startTime(), zone),
78                                                                 ZonedDateTime.ofInstant(windowedWord.window().endTime(), zone),
79                                                                 windowedWord.key().getWord()),
80                                                 WordCounter.of(windowedWord.key().getWord(), counter)))
81                                 .peek((windowedWord, wordCounter) -> log.info("results: {} -> {}", windowedWord, wordCounter))
82                                 .toTable(
83                                                 Materialized
84                                                                 .<WindowedWord, WordCounter>as(keyValueBytesStoreSupplier)
85                                                                 .withKeySerde(new JsonSerde<>(WindowedWord.class).noTypeInfo())
86                                                                 .withValueSerde(new JsonSerde<>(WordCounter.class).noTypeInfo()))
87                                 .toStream()
88                                 .peek((windowedWord, wordCounter) -> log.info("output: {} -> {}", windowedWord, wordCounter))
89                                 .to(outputTopic, Produced.with(outKeySerde(), outValueSerde()));
90
91                 Topology topology = builder.build();
92                 log.info("\n\n{}", topology.describe());
93
94                 return topology;
95         }
96
97         ReadOnlyKeyValueStore<WindowedWord, WordCounter> getStore()
98         {
99                 return streams.store(StoreQueryParameters.fromNameAndType(KEY_VALUE_STORE_NAME, QueryableStoreTypes.keyValueStore()));
100         }
101
102         public void start()
103         {
104                 log.info("Starting Stream-Processor");
105                 streams.start();
106         }
107
108         public void stop()
109         {
110                 log.info("Stopping Stream-Processor");
111                 streams.close();
112         }
113
114
115
116         public static JsonSerde<User> inKeySerde()
117         {
118                 return new JsonSerde<>(User.class);
119         }
120
121         public static JsonSerde<Word> inValueSerde()
122         {
123                 return new JsonSerde<>(Word.class);
124         }
125
126         public static JsonSerde<WindowedWord> outKeySerde()
127         {
128                 return serde(true);
129         }
130
131         public static JsonSerde<WordCounter> outValueSerde()
132         {
133                 return serde(false);
134         }
135
136         public static <T> JsonSerde<T> serde(boolean isKey)
137         {
138                 JsonSerde<T> serde = new JsonSerde<>();
139                 serde.configure(
140                                 Map.of(JsonSerializer.TYPE_MAPPINGS, typeMappingsConfig()),
141                                 isKey);
142                 return serde;
143         }
144
145         private static String typeMappingsConfig()
146         {
147                 return typeMappingsConfig(WindowedWord.class, WordCounter.class);
148         }
149
150         public static String typeMappingsConfig(Class wordClass, Class wordCounterClass)
151         {
152                 return Map.of(
153                                                 "word", wordClass,
154                                                 "counter", wordCounterClass)
155                                 .entrySet()
156                                 .stream()
157                                 .map(entry -> entry.getKey() + ":" + entry.getValue().getName())
158                                 .collect(Collectors.joining(","));
159         }
160 }