@Slf4j
public class Top10StreamProcessor
{
+ public static final String STORE_NAME= "top10";
+
public final KafkaStreams streams;
builder
.<Key, Entry>stream(inputTopic)
- .map((key, entry) -> new KeyValue<>(User.of(key.getUser()), entry))
+ .map((key, entry) -> new KeyValue<>(User.of(key.getChannel()), entry))
.groupByKey()
.aggregate(
() -> new Ranking(),
return topology;
}
- ReadOnlyKeyValueStore<User, Ranking> getStore(String name)
+ ReadOnlyKeyValueStore<User, Ranking> getStore()
{
- return streams.store(StoreQueryParameters.fromNameAndType(name, QueryableStoreTypes.keyValueStore()));
+ return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
}
public void start()