top10: 1.2.1 - (RED) Added an assertion regarding the expected state
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / top10 / Top10StreamProcessor.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.KafkaStreams;
5 import org.apache.kafka.streams.KeyValue;
6 import org.apache.kafka.streams.StreamsBuilder;
7 import org.apache.kafka.streams.Topology;
8 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
9 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
10
11 import java.util.Properties;
12
13
14 @Slf4j
15 public class Top10StreamProcessor
16 {
17         public final KafkaStreams streams;
18
19
20         public Top10StreamProcessor(
21                         String inputTopic,
22                         String outputTopic,
23                         Properties props)
24         {
25                 Topology topology = Top10StreamProcessor.buildTopology(
26                                 inputTopic,
27                                 outputTopic,
28                                 null);
29
30                 streams = new KafkaStreams(topology, props);
31         }
32
33         static Topology buildTopology(
34                         String inputTopic,
35                         String outputTopic,
36                         KeyValueBytesStoreSupplier storeSupplier)
37         {
38                 StreamsBuilder builder = new StreamsBuilder();
39
40                 builder
41                                 .<Key, Entry>stream(inputTopic)
42                                 .map((key, entry) -> new KeyValue<>(User.of(key.getUser()), entry))
43                                 .groupByKey()
44                                 .aggregate(
45                                                 () -> new Ranking(),
46                                                 (user, entry, ranking) -> ranking.add(entry))
47                                 .toStream()
48                                 .to(outputTopic);
49
50                 Topology topology = builder.build();
51                 log.info("\n\n{}", topology.describe());
52
53                 return topology;
54         }
55
56         ReadOnlyKeyValueStore<User, Ranking> getStore(String name)
57         {
58                 return null;
59         }
60
61         public void start()
62         {
63                 log.info("Starting Stream-Processor");
64                 streams.start();
65         }
66
67         public void stop()
68         {
69                 log.info("Stopping Stream-Processor");
70                 streams.close();
71         }
72 }