- StreamsBuilder builder = new StreamsBuilder();
-
- builder
- .<String, String>stream(properties.getInputTopic())
- .map((keyJson, countStr) ->
- {
- try
- {
- Key key = mapper.readValue(keyJson, Key.class);
- Long count = Long.parseLong(countStr);
- Entry entry = Entry.of(key.getWord(), count);
- String entryJson = mapper.writeValueAsString(entry);
- return new KeyValue<>(key.getUsername(), entryJson);
- }
- catch (JsonProcessingException e)
- {
- throw new RuntimeException(e);
- }
- })
- .groupByKey()
- .aggregate(
- () -> "{\"entries\" : []}",
- (username, entryJson, rankingJson) ->
- {
- try
- {
- Ranking ranking = mapper.readValue(rankingJson, Ranking.class);
- ranking.add(mapper.readValue(entryJson, Entry.class));
- return mapper.writeValueAsString(ranking);
- }
- catch (JsonProcessingException e)
- {
- throw new RuntimeException(e);
- }
- }
- )
- .toStream()
- .to(properties.getOutputTopic());
-