@Slf4j
public class StatsStreamProcessor
{
- public static final String STATS_TYPE = "COUNTER";
public static final String USER_STORE_NAME = "users";
public static final String RANKING_STORE_NAME = "rankings";
.withValueSerde(new JsonSerde().copyWithType(User.class)));
KStream<String, Ranking> rankings = builder
.<Key, Ranking>stream(rankingInputTopic)
- .filter((key, value) -> STATS_TYPE.equals(key.getType()))
.map((key, value) -> new KeyValue<>(key.getChannel(), value));
rankings
import java.util.function.Function;
import java.util.stream.Stream;
-import static de.juplo.kafka.wordcount.stats.StatsStreamProcessor.STATS_TYPE;
import static org.assertj.core.api.Assertions.assertThat;
class TestData
{
- static final TestUser PETER = TestUser.of(STATS_TYPE, "peter");
- static final TestUser KLAUS = TestUser.of(STATS_TYPE, "klaus");
+ static final TestUser PETER = TestUser.of(StatisticsType.POPULAR.name(), "peter");
+ static final TestUser KLAUS = TestUser.of(StatisticsType.POPULAR.name(), "klaus");
static final Stream<KeyValue<TestUser, TestRanking>> getTop10Messages()
{