- .map((username, word) ->
- {
- try
- {
- String key = mapper.writeValueAsString(Key.of(username, word));
- return new KeyValue<>(key, word);
- }
- catch (JsonProcessingException e)
- {
- throw new RuntimeException(e);
- }
- })
- .groupByKey()
- .count(Materialized.as(storeSupplier))
- .mapValues(value->Long.toString(value))
+ .map((key, word) -> new KeyValue<>(word, word))
+ .groupByKey(Grouped.with(
+ new JsonSerde<>(Word.class)
+ .forKeys()
+ .noTypeInfo(),
+ new JsonSerde<>(Word.class)
+ .noTypeInfo()))
+ .count(Materialized
+ .<Word,Long>as(storeSupplier)
+ .withKeySerde(
+ new JsonSerde<>(Word.class)
+ .forKeys()
+ .noTypeInfo())
+ .withValueSerde(
+ Serdes.Long()))