2b2cf93ff8231e307dcb786176fd10e4349db67c
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / top10 / Top10StreamProcessor.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.KafkaStreams;
5 import org.apache.kafka.streams.KeyValue;
6 import org.apache.kafka.streams.StreamsBuilder;
7 import org.apache.kafka.streams.Topology;
8
9 import java.util.Properties;
10
11
12 @Slf4j
13 public class Top10StreamProcessor
14 {
15         public final KafkaStreams streams;
16
17
18         public Top10StreamProcessor(
19                         String inputTopic,
20                         String outputTopic,
21                         Properties props)
22         {
23                 Topology topology = Top10StreamProcessor.buildTopology(
24                                 inputTopic,
25                                 outputTopic);
26
27                 streams = new KafkaStreams(topology, props);
28         }
29
30         static Topology buildTopology(
31                         String inputTopic,
32                         String outputTopic)
33         {
34                 StreamsBuilder builder = new StreamsBuilder();
35
36                 builder
37                                 .<Key, Entry>stream(inputTopic)
38                                 .map((key, entry) -> new KeyValue<>(key.getUser(), entry))
39                                 .groupByKey()
40                                 .aggregate(
41                                                 () -> new Ranking(),
42                                                 (user, entry, ranking) ->ranking.add(entry))
43                                 .toStream()
44                                 .to(outputTopic);
45
46                 Topology topology = builder.build();
47                 log.info("\n\n{}", topology.describe());
48
49                 return topology;
50         }
51
52         public void start()
53         {
54                 log.info("Starting Stream-Processor");
55                 streams.start();
56         }
57
58         public void stop()
59         {
60                 log.info("Stopping Stream-Processor");
61                 streams.close();
62         }
63 }