WIP
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / top10 / Top10StreamProcessor.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.KafkaStreams;
5 import org.apache.kafka.streams.KeyValue;
6 import org.apache.kafka.streams.StreamsBuilder;
7
8 import java.util.Properties;
9
10
11 @Slf4j
12 public class Top10StreamProcessor
13 {
14         public final KafkaStreams streams;
15
16
17         public Top10StreamProcessor(
18                         String inputTopic,
19                         String outputTopic,
20                         Properties props)
21         {
22                 StreamsBuilder builder = new StreamsBuilder();
23
24                 builder
25                                 .<Key, Counter>stream(inputTopic)
26                                 .map((key, counter) ->
27                                 {
28                                         Entry entry = Entry.of(key.getWord(), counter.getCounter());
29                                         return new KeyValue<>(key.getUser(), entry);
30                                 })
31                                 .groupByKey()
32                                 .aggregate(
33                                                 () -> new Ranking(),
34                                                 (user, entry, ranking) ->
35                                                 {
36                                                         ranking.add(entry);
37                                                         return ranking;
38                                                 }
39                                 )
40                                 .toStream()
41                                 .to(outputTopic);
42
43                 streams = new KafkaStreams(builder.build(), props);
44         }
45
46         public void start()
47         {
48                 log.info("Starting Stream-Processor");
49                 streams.start();
50         }
51
52         public void stop()
53         {
54                 log.info("Stopping Stream-Processor");
55                 streams.close();
56         }
57 }