f0a7d191978d2b9eef1169632ca59ce858664261
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / top10 / Top10StreamProcessor.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.streams.KafkaStreams;
7 import org.apache.kafka.streams.KeyValue;
8 import org.apache.kafka.streams.StreamsBuilder;
9
10 import java.util.Properties;
11
12
13 @Slf4j
14 public class Top10StreamProcessor
15 {
16         public final KafkaStreams streams;
17
18
19         public Top10StreamProcessor(
20                         String inputTopic,
21                         String outputTopic,
22                         ObjectMapper mapper,
23                         Properties props)
24         {
25                 StreamsBuilder builder = new StreamsBuilder();
26
27                 builder
28                                 .<String, String>stream(inputTopic)
29                                 .map((keyJson, countStr) ->
30                                 {
31                                         try
32                                         {
33                                                 Key key = mapper.readValue(keyJson, Key.class);
34                                                 Long count = Long.parseLong(countStr);
35                                                 Entry entry = Entry.of(key.getWord(), count);
36                                                 String entryJson = mapper.writeValueAsString(entry);
37                                                 return new KeyValue<>(key.getUsername(), entryJson);
38                                         }
39                                         catch (JsonProcessingException e)
40                                         {
41                                                 throw new RuntimeException(e);
42                                         }
43                                 })
44                                 .groupByKey()
45                                 .aggregate(
46                                                 () -> "{\"entries\"     : []}",
47                                                 (username, entryJson, rankingJson) ->
48                                                 {
49                                                         try
50                                                         {
51                                                                 Ranking ranking = mapper.readValue(rankingJson, Ranking.class);
52                                                                 ranking.add(mapper.readValue(entryJson, Entry.class));
53                                                                 return mapper.writeValueAsString(ranking);
54                                                         }
55                                                         catch (JsonProcessingException e)
56                                                         {
57                                                                 throw new RuntimeException(e);
58                                                         }
59                                                 }
60                                 )
61                                 .toStream()
62                                 .to(outputTopic);
63
64                 streams = new KafkaStreams(builder.build(), props);
65         }
66
67         public void start()
68         {
69                 log.info("Starting Stream-Processor");
70                 streams.start();
71         }
72
73         public void stop()
74         {
75                 log.info("Stopping Stream-Processor");
76                 streams.close();
77         }
78 }