counter: 1.3.1 - Refined/Simplified the type-mapping
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / counter / CounterStreamProcessor.java
1 package de.juplo.kafka.wordcount.counter;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.*;
5 import org.apache.kafka.streams.kstream.Consumed;
6 import org.apache.kafka.streams.kstream.Materialized;
7 import org.apache.kafka.streams.kstream.Produced;
8 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
9 import org.apache.kafka.streams.state.QueryableStoreTypes;
10 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
11 import org.springframework.kafka.support.serializer.JsonSerde;
12 import org.springframework.kafka.support.serializer.JsonSerializer;
13
14 import java.util.Map;
15 import java.util.Properties;
16 import java.util.stream.Collectors;
17
18
19 @Slf4j
20 public class CounterStreamProcessor
21 {
22         public static final String STORE_NAME = "counter";
23
24
25         public final KafkaStreams streams;
26
27
28         public CounterStreamProcessor(
29                         String inputTopic,
30                         String outputTopic,
31                         Properties properties,
32                         KeyValueBytesStoreSupplier storeSupplier)
33         {
34                 Topology topology = CounterStreamProcessor.buildTopology(
35                                 inputTopic,
36                                 outputTopic,
37                                 storeSupplier);
38
39                 streams = new KafkaStreams(topology, properties);
40         }
41
42         static Topology buildTopology(
43                         String inputTopic,
44                         String outputTopic,
45                         KeyValueBytesStoreSupplier storeSupplier)
46         {
47                 StreamsBuilder builder = new StreamsBuilder();
48
49                 builder
50                                 .stream(
51                                                 inputTopic,
52                                                 Consumed.with(
53                                                                 new JsonSerde<>(User.class),
54                                                                 new JsonSerde<>(Word.class)))
55                                 .map((key, word) -> new KeyValue<>(word, word))
56                                 .groupByKey()
57                                 .count(
58                                                 Materialized
59                                                                 .<Word, Long>as(storeSupplier)
60                                                                 .withKeySerde(new JsonSerde<>().copyWithType(Word.class).forKeys()))
61                                 .toStream()
62                                 .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter)))
63                                 .to(outputTopic, Produced.with(outKeySerde(), outValueSerde()));
64
65                 Topology topology = builder.build();
66                 log.info("\n\n{}", topology.describe());
67
68                 return topology;
69         }
70
71         ReadOnlyKeyValueStore<Word, Long> getStore()
72         {
73                 return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
74         }
75
76         public void start()
77         {
78                 log.info("Starting Stream-Processor");
79                 streams.start();
80         }
81
82         public void stop()
83         {
84                 log.info("Stopping Stream-Processor");
85                 streams.close();
86         }
87
88
89
90         public static JsonSerde<Word> outKeySerde()
91         {
92                 return serde(true);
93         }
94
95         public static JsonSerde<WordCounter> outValueSerde()
96         {
97                 return serde(false);
98         }
99
100         public static <T> JsonSerde<T> serde(boolean isKey)
101         {
102                 JsonSerde<T> serde = new JsonSerde<>();
103                 serde.configure(
104                                 Map.of(JsonSerializer.TYPE_MAPPINGS, typeMappingsConfig()),
105                                 isKey);
106                 return serde;
107         }
108
109         private static String typeMappingsConfig()
110         {
111                 return typeMappings()
112                                 .entrySet()
113                                 .stream()
114                                 .map(entry -> entry.getKey() + ":" + entry.getValue().getName())
115                                 .collect(Collectors.joining(","));
116         }
117
118         private static Map<String, Class> typeMappings()
119         {
120                 return Map.of(
121                                 "word", Word.class,
122                                 "counter", WordCounter.class);
123         }
124 }