builder
.<Key, Entry>stream(inputTopic)
- .map((key, entry) -> new KeyValue<>(User.of(key.getChannel()), entry))
+ .map((key, entry) -> new KeyValue<>(
+ Stats.of(key.getType(), key.getChannel()),
+ entry))
.groupByKey()
.aggregate(
() -> new Ranking(),
- (user, entry, ranking) -> ranking.add(entry),
+ (stats, entry, ranking) -> ranking.add(entry),
Materialized.as(storeSupplier))
.toStream()
.to(outputTopic);
return topology;
}
- ReadOnlyKeyValueStore<User, Ranking> getStore()
+ ReadOnlyKeyValueStore<Stats, Ranking> getStore()
{
return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
}