1 package de.juplo.kafka.wordcount.top10;
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.streams.KafkaStreams;
6 import org.apache.kafka.streams.KeyValue;
7 import org.apache.kafka.streams.StreamsBuilder;
9 import java.util.Properties;
13 public class Top10StreamProcessor
15 public final KafkaStreams streams;
18 public Top10StreamProcessor(
24 StreamsBuilder builder = new StreamsBuilder();
27 .<Word, WordCount>stream(inputTopic)
28 .map((word, wordCount) ->
30 Entry entry = Entry.of(word.getWord(), wordCount.getCount());
31 return new KeyValue<>(word.getUser(), entry);
36 (user, entry, ranking) ->
45 streams = new KafkaStreams(builder.build(), props);
50 log.info("Starting Stream-Processor");
56 log.info("Stopping Stream-Processor");