package de.juplo.kafka.wordcount.top10;
-import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
public Top10StreamProcessor(
String inputTopic,
String outputTopic,
- ObjectMapper mapper,
Properties props)
{
StreamsBuilder builder = new StreamsBuilder();
builder
- .<Word, Counter>stream(inputTopic)
- .map((word, counter) ->
+ .<Key, Counter>stream(inputTopic)
+ .map((key, counter) ->
{
- Entry entry = Entry.of(word.getWord(), counter.getCounter());
- return new KeyValue<>(word.getUser(), entry);
+ Entry entry = Entry.of(key.getWord(), counter.getCounter());
+ return new KeyValue<>(key.getUser(), entry);
})
.groupByKey()
.aggregate(