counter: 1.4.2 - RocksDB does nor work in Alpine-Linux
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / counter / CounterStreamProcessor.java
1 package de.juplo.kafka.wordcount.counter;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.*;
5 import org.apache.kafka.streams.kstream.Consumed;
6 import org.apache.kafka.streams.kstream.Materialized;
7 import org.apache.kafka.streams.kstream.Produced;
8 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
9 import org.apache.kafka.streams.state.QueryableStoreTypes;
10 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
11 import org.springframework.kafka.support.serializer.JsonSerde;
12 import org.springframework.kafka.support.serializer.JsonSerializer;
13
14 import java.util.Map;
15 import java.util.Properties;
16 import java.util.stream.Collectors;
17
18
19 @Slf4j
20 public class CounterStreamProcessor
21 {
22         public static final String TYPE = "COUNTER";
23         public static final String STORE_NAME = "counter";
24
25
26         public final KafkaStreams streams;
27
28
29         public CounterStreamProcessor(
30                         String inputTopic,
31                         String outputTopic,
32                         Properties properties,
33                         KeyValueBytesStoreSupplier storeSupplier)
34         {
35                 Topology topology = CounterStreamProcessor.buildTopology(
36                                 inputTopic,
37                                 outputTopic,
38                                 storeSupplier);
39
40                 streams = new KafkaStreams(topology, properties);
41         }
42
43         static Topology buildTopology(
44                         String inputTopic,
45                         String outputTopic,
46                         KeyValueBytesStoreSupplier storeSupplier)
47         {
48                 StreamsBuilder builder = new StreamsBuilder();
49
50                 builder
51                                 .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
52                                 .mapValues(word -> Word.of(word.getUser(), word.getWord()))
53                                 .map((key, word) -> new KeyValue<>(word, word))
54                                 .groupByKey()
55                                 .count(
56                                                 Materialized
57                                                                 .<Word, Long>as(storeSupplier)
58                                                                 .withKeySerde(new JsonSerde<>(Word.class))) // No headers are present: fixed typing is needed!
59                                 .toStream()
60                                 .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter)))
61                                 .to(outputTopic, Produced.with(outKeySerde(), outValueSerde()));
62
63                 Topology topology = builder.build();
64                 log.info("\n\n{}", topology.describe());
65
66                 return topology;
67         }
68
69         ReadOnlyKeyValueStore<Word, Long> getStore()
70         {
71                 return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
72         }
73
74         public void start()
75         {
76                 log.info("Starting Stream-Processor");
77                 streams.start();
78         }
79
80         public void stop()
81         {
82                 log.info("Stopping Stream-Processor");
83                 streams.close();
84         }
85
86
87
88         public static JsonSerde<User> inKeySerde()
89         {
90                 return new JsonSerde<>(User.class);
91         }
92
93         public static JsonSerde<UserWord> inValueSerde()
94         {
95                 return new JsonSerde<>(UserWord.class);
96         }
97
98         public static JsonSerde<Word> outKeySerde()
99         {
100                 return serde(true);
101         }
102
103         public static JsonSerde<WordCounter> outValueSerde()
104         {
105                 return serde(false);
106         }
107
108         public static <T> JsonSerde<T> serde(boolean isKey)
109         {
110                 JsonSerde<T> serde = new JsonSerde<>();
111                 serde.configure(
112                                 Map.of(JsonSerializer.TYPE_MAPPINGS, typeMappingsConfig()),
113                                 isKey);
114                 return serde;
115         }
116
117         private static String typeMappingsConfig()
118         {
119                 return typeMappingsConfig(Word.class, WordCounter.class);
120         }
121
122         public static String typeMappingsConfig(Class keyClass, Class counterClass)
123         {
124                 return Map.of(
125                                                 "key", keyClass,
126                                                 "counter", counterClass)
127                                 .entrySet()
128                                 .stream()
129                                 .map(entry -> entry.getKey() + ":" + entry.getValue().getName())
130                                 .collect(Collectors.joining(","));
131         }
132 }