StreamsBuilder builder = new StreamsBuilder();
builder
- .<Key, Counter>stream(inputTopic)
- .map((key, counter) ->
- {
- Entry entry = Entry.of(key.getWord(), counter.getCounter());
- return new KeyValue<>(key.getUser(), entry);
- })
+ .<Key, Entry>stream(inputTopic)
+ .map((key, entry) -> new KeyValue<>(key.getUser(), entry))
.groupByKey()
.aggregate(
() -> new Ranking(),
- (user, entry, ranking) ->
- {
- ranking.add(entry);
- return ranking;
- }
- )
+ (user, entry, ranking) ->ranking.add(entry))
.toStream()
.to(outputTopic);