1235132e6e305b156a65ee9a327e9c78a5c4e762
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / top10 / Top10StreamProcessor.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.*;
5 import org.apache.kafka.streams.kstream.Materialized;
6 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
7 import org.apache.kafka.streams.state.QueryableStoreTypes;
8 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
9
10 import java.util.Properties;
11
12
13 @Slf4j
14 public class Top10StreamProcessor
15 {
16         public static final String STORE_NAME= "top10";
17
18         public final KafkaStreams streams;
19
20
21         public Top10StreamProcessor(
22                         String inputTopic,
23                         String outputTopic,
24                         Properties props,
25                         KeyValueBytesStoreSupplier storeSupplier)
26         {
27                 Topology topology = Top10StreamProcessor.buildTopology(
28                                 inputTopic,
29                                 outputTopic,
30                                 storeSupplier);
31
32                 streams = new KafkaStreams(topology, props);
33         }
34
35         static Topology buildTopology(
36                         String inputTopic,
37                         String outputTopic,
38                         KeyValueBytesStoreSupplier storeSupplier)
39         {
40                 StreamsBuilder builder = new StreamsBuilder();
41
42                 builder
43                                 .<Key, Entry>stream(inputTopic)
44                                 .map((key, entry) -> new KeyValue<>(
45                                                 Stats.of(key.getType(), key.getChannel()),
46                                                 entry))
47                                 .groupByKey()
48                                 .aggregate(
49                                                 () -> new Ranking(),
50                                                 (stats, entry, ranking) -> ranking.add(entry),
51                                                 Materialized.as(storeSupplier))
52                                 .toStream()
53                                 .to(outputTopic);
54
55                 Topology topology = builder.build();
56                 log.info("\n\n{}", topology.describe());
57
58                 return topology;
59         }
60
61         ReadOnlyKeyValueStore<Stats, Ranking> getStore()
62         {
63                 return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
64         }
65
66         public void start()
67         {
68                 log.info("Starting Stream-Processor");
69                 streams.start();
70         }
71
72         public void stop()
73         {
74                 log.info("Stopping Stream-Processor");
75                 streams.close();
76         }
77 }