70ead8796c49617037fa39a176aa44390417a272
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / top10 / Top10StreamProcessor.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.*;
5 import org.apache.kafka.streams.kstream.Materialized;
6 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
7 import org.apache.kafka.streams.state.QueryableStoreTypes;
8 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
9
10 import java.util.Properties;
11
12
13 @Slf4j
14 public class Top10StreamProcessor
15 {
16         public static final String STORE_NAME= "top10";
17
18         public final KafkaStreams streams;
19
20
21         public Top10StreamProcessor(
22                         String inputTopic,
23                         String outputTopic,
24                         Properties props,
25                         KeyValueBytesStoreSupplier storeSupplier)
26         {
27                 Topology topology = Top10StreamProcessor.buildTopology(
28                                 inputTopic,
29                                 outputTopic,
30                                 storeSupplier);
31
32                 streams = new KafkaStreams(topology, props);
33         }
34
35         static Topology buildTopology(
36                         String inputTopic,
37                         String outputTopic,
38                         KeyValueBytesStoreSupplier storeSupplier)
39         {
40                 StreamsBuilder builder = new StreamsBuilder();
41
42                 builder
43                                 .<Key, Entry>stream(inputTopic)
44                                 .map((key, entry) -> new KeyValue<>(User.of(key.getUser()), entry))
45                                 .groupByKey()
46                                 .aggregate(
47                                                 () -> new Ranking(),
48                                                 (user, entry, ranking) -> ranking.add(entry),
49                                                 Materialized.as(storeSupplier))
50                                 .toStream()
51                                 .to(outputTopic);
52
53                 Topology topology = builder.build();
54                 log.info("\n\n{}", topology.describe());
55
56                 return topology;
57         }
58
59         ReadOnlyKeyValueStore<User, Ranking> getStore()
60         {
61                 return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
62         }
63
64         public void start()
65         {
66                 log.info("Starting Stream-Processor");
67                 streams.start();
68         }
69
70         public void stop()
71         {
72                 log.info("Stopping Stream-Processor");
73                 streams.close();
74         }
75 }