@Slf4j
public class QueryStreamProcessor
{
+ public static final String STATS_TYPE = "COUNTER";
public static final String USER_STORE_NAME = "users";
public static final String RANKING_STORE_NAME = "rankings";
.withValueSerde(new JsonSerde().copyWithType(User.class)));
KStream<String, Ranking> rankings = builder
.<Key, Ranking>stream(rankingInputTopic)
- .map((key, value) -> new KeyValue<>(key.getUser(), value));
+ .filter((key, value) -> STATS_TYPE.equals(key.getType()))
+ .map((key, value) -> new KeyValue<>(key.getChannel(), value));
rankings
.join(users, (ranking, user) -> UserRanking.of(