WIP
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / top10 / Top10StreamProcessor.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.streams.KafkaStreams;
6 import org.apache.kafka.streams.KeyValue;
7 import org.apache.kafka.streams.StreamsBuilder;
8
9 import java.util.Properties;
10
11
12 @Slf4j
13 public class Top10StreamProcessor
14 {
15         public final KafkaStreams streams;
16
17
18         public Top10StreamProcessor(
19                         String inputTopic,
20                         String outputTopic,
21                         ObjectMapper mapper,
22                         Properties props)
23         {
24                 StreamsBuilder builder = new StreamsBuilder();
25
26                 builder
27                                 .<Word, WordCount>stream(inputTopic)
28                                 .map((word, wordCount) ->
29                                 {
30                                         Entry entry = Entry.of(word.getWord(), wordCount.getCount());
31                                         return new KeyValue<>(word.getUser(), entry);
32                                 })
33                                 .groupByKey()
34                                 .aggregate(
35                                                 () -> new Ranking(),
36                                                 (user, entry, ranking) ->
37                                                 {
38                                                         ranking.add(entry);
39                                                         return ranking;
40                                                 }
41                                 )
42                                 .toStream()
43                                 .to(outputTopic);
44
45                 streams = new KafkaStreams(builder.build(), props);
46         }
47
48         public void start()
49         {
50                 log.info("Starting Stream-Processor");
51                 streams.start();
52         }
53
54         public void stop()
55         {
56                 log.info("Stopping Stream-Processor");
57                 streams.close();
58         }
59 }