bccb253b0a9cb92b8863f3954b6028a6dcedb09e
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / counter / CounterStreamProcessor.java
1 package de.juplo.kafka.wordcount.counter;
2
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.common.serialization.Serdes;
6 import org.apache.kafka.streams.KafkaStreams;
7 import org.apache.kafka.streams.KeyValue;
8 import org.apache.kafka.streams.StreamsBuilder;
9 import org.apache.kafka.streams.Topology;
10 import org.apache.kafka.streams.kstream.KStream;
11 import org.apache.kafka.streams.kstream.Materialized;
12 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
13 import org.apache.kafka.streams.kstream.*;
14 import org.springframework.kafka.support.serializer.JsonSerde;
15
16 import java.util.Properties;
17
18
19 @Slf4j
20 public class CounterStreamProcessor
21 {
22         public final KafkaStreams streams;
23
24
25         public CounterStreamProcessor(
26                         String inputTopic,
27                         String outputTopic,
28                         Properties properties,
29                         KeyValueBytesStoreSupplier storeSupplier,
30                         ObjectMapper mapper)
31         {
32                 Topology topology = CounterStreamProcessor.buildTopology(
33                                 inputTopic,
34                                 outputTopic,
35                                 storeSupplier,
36                                 mapper);
37
38                 streams = new KafkaStreams(topology, properties);
39         }
40
41         static Topology buildTopology(
42                         String inputTopic,
43                         String outputTopic,
44                         KeyValueBytesStoreSupplier storeSupplier,
45                         ObjectMapper mapper)
46         {
47                 StreamsBuilder builder = new StreamsBuilder();
48
49                 KStream<String, Word> source = builder.stream(
50                                 inputTopic,
51                                 Consumed.with(
52                                                 Serdes.String(),
53                                                 new JsonSerde<>(Word.class)
54                                                                 .ignoreTypeHeaders()));
55
56                 source
57                                 .map((key, word) -> new KeyValue<>(word, word))
58                                 .groupByKey(Grouped.with(
59                                                 new JsonSerde<>(Word.class)
60                                                                 .forKeys()
61                                                                 .noTypeInfo(),
62                                                 new JsonSerde<>(Word.class)
63                                                                 .noTypeInfo()))
64                                 .count(Materialized.as(storeSupplier))
65                                 .toStream()
66                                 .map((word, count) -> new KeyValue<>(word, WordCount.of(word.getUser(), word.getWord(), count)))
67                                 .to(
68                                                 outputTopic,
69                                                 Produced.with(
70                                                                 new JsonSerde<>(Word.class)
71                                                                                 .forKeys()
72                                                                                 .noTypeInfo(),
73                                                                 new JsonSerde<>(WordCount.class)
74                                                                                 .noTypeInfo()));
75
76                 Topology topology = builder.build();
77                 log.info("\n\n{}", topology.describe());
78
79                 return topology;
80         }
81
82         public void start()
83         {
84                 log.info("Starting Stream-Processor");
85                 streams.start();
86         }
87
88         public void stop()
89         {
90                 log.info("Stopping Stream-Processor");
91                 streams.close();
92         }
93 }