1 package de.juplo.kafka.wordcount.top10;
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.streams.KafkaStreams;
7 import org.apache.kafka.streams.KeyValue;
8 import org.apache.kafka.streams.StreamsBuilder;
10 import java.util.Properties;
14 public class Top10StreamProcessor
16 public final KafkaStreams streams;
19 public Top10StreamProcessor(
25 StreamsBuilder builder = new StreamsBuilder();
28 .<String, String>stream(inputTopic)
29 .map((keyJson, countStr) ->
33 Key key = mapper.readValue(keyJson, Key.class);
34 Long count = Long.parseLong(countStr);
35 Entry entry = Entry.of(key.getWord(), count);
36 String entryJson = mapper.writeValueAsString(entry);
37 return new KeyValue<>(key.getUsername(), entryJson);
39 catch (JsonProcessingException e)
41 throw new RuntimeException(e);
46 () -> "{\"entries\" : []}",
47 (username, entryJson, rankingJson) ->
51 Ranking ranking = mapper.readValue(rankingJson, Ranking.class);
52 ranking.add(mapper.readValue(entryJson, Entry.class));
53 return mapper.writeValueAsString(ranking);
55 catch (JsonProcessingException e)
57 throw new RuntimeException(e);
64 streams = new KafkaStreams(builder.build(), props);
69 log.info("Starting Stream-Processor");
75 log.info("Stopping Stream-Processor");