343ab4d85e970199981798a89efea7714c70023c
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / top10 / Top10StreamProcessor.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.*;
5 import org.apache.kafka.streams.kstream.Materialized;
6 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
7 import org.apache.kafka.streams.state.QueryableStoreTypes;
8 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
9
10 import java.util.Properties;
11
12
13 @Slf4j
14 public class Top10StreamProcessor
15 {
16         public final KafkaStreams streams;
17
18
19         public Top10StreamProcessor(
20                         String inputTopic,
21                         String outputTopic,
22                         Properties props,
23                         KeyValueBytesStoreSupplier storeSupplier)
24         {
25                 Topology topology = Top10StreamProcessor.buildTopology(
26                                 inputTopic,
27                                 outputTopic,
28                                 storeSupplier);
29
30                 streams = new KafkaStreams(topology, props);
31         }
32
33         static Topology buildTopology(
34                         String inputTopic,
35                         String outputTopic,
36                         KeyValueBytesStoreSupplier storeSupplier)
37         {
38                 StreamsBuilder builder = new StreamsBuilder();
39
40                 builder
41                                 .<Key, Entry>stream(inputTopic)
42                                 .map((key, entry) -> new KeyValue<>(User.of(key.getUser()), entry))
43                                 .groupByKey()
44                                 .aggregate(
45                                                 () -> new Ranking(),
46                                                 (user, entry, ranking) -> ranking.add(entry),
47                                                 Materialized.as(storeSupplier))
48                                 .toStream()
49                                 .to(outputTopic);
50
51                 Topology topology = builder.build();
52                 log.info("\n\n{}", topology.describe());
53
54                 return topology;
55         }
56
57         ReadOnlyKeyValueStore<User, Ranking> getStore(String name)
58         {
59                 return streams.store(StoreQueryParameters.fromNameAndType(name, QueryableStoreTypes.keyValueStore()));
60         }
61
62         public void start()
63         {
64                 log.info("Starting Stream-Processor");
65                 streams.start();
66         }
67
68         public void stop()
69         {
70                 log.info("Stopping Stream-Processor");
71                 streams.close();
72         }
73 }