084e4250582efdbf21127d24cfbe1d334a485bd8
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / top10 / Top10StreamProcessor.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.KafkaStreams;
5 import org.apache.kafka.streams.KeyValue;
6 import org.apache.kafka.streams.StreamsBuilder;
7 import org.apache.kafka.streams.Topology;
8
9 import java.util.Properties;
10
11
12 @Slf4j
13 public class Top10StreamProcessor
14 {
15         public final KafkaStreams streams;
16
17
18         public Top10StreamProcessor(
19                         String inputTopic,
20                         String outputTopic,
21                         Properties props)
22         {
23                 Topology topology = Top10StreamProcessor.buildTopology(
24                                 inputTopic,
25                                 outputTopic);
26
27                 streams = new KafkaStreams(topology, props);
28         }
29
30         static Topology buildTopology(
31                         String inputTopic,
32                         String outputTopic)
33         {
34                 StreamsBuilder builder = new StreamsBuilder();
35
36                 builder
37                                 .<Key, Counter>stream(inputTopic)
38                                 .map((key, counter) ->
39                                 {
40                                         Entry entry = Entry.of(key.getWord(), counter.getCounter());
41                                         return new KeyValue<>(key.getUser(), entry);
42                                 })
43                                 .groupByKey()
44                                 .aggregate(
45                                                 () -> new Ranking(),
46                                                 (user, entry, ranking) ->
47                                                 {
48                                                         ranking.add(entry);
49                                                         return ranking;
50                                                 }
51                                 )
52                                 .toStream()
53                                 .to(outputTopic);
54
55                 Topology topology = builder.build();
56                 log.info("\n\n{}", topology.describe());
57
58                 return topology;
59         }
60
61         public void start()
62         {
63                 log.info("Starting Stream-Processor");
64                 streams.start();
65         }
66
67         public void stop()
68         {
69                 log.info("Stopping Stream-Processor");
70                 streams.close();
71         }
72 }