WIP
authorKai Moritz <kai@juplo.de>
Tue, 25 Jun 2024 04:31:58 +0000 (06:31 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 25 Jun 2024 04:31:58 +0000 (06:31 +0200)
src/main/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessor.java
src/test/java/de/juplo/kafka/wordcount/stats/TestData.java

index f702268..6beb7f0 100644 (file)
@@ -20,7 +20,6 @@ import java.util.Properties;
 @Slf4j
 public class StatsStreamProcessor
 {
-       public static final String STATS_TYPE = "COUNTER";
        public static final String USER_STORE_NAME = "users";
        public static final String RANKING_STORE_NAME = "rankings";
 
@@ -66,7 +65,6 @@ public class StatsStreamProcessor
                                                                .withValueSerde(new JsonSerde().copyWithType(User.class)));
                KStream<String, Ranking> rankings = builder
                                .<Key, Ranking>stream(rankingInputTopic)
-                               .filter((key, value) -> STATS_TYPE.equals(key.getType()))
                                .map((key, value) -> new KeyValue<>(key.getChannel(), value));
 
                rankings
index 1603619..50ed697 100644 (file)
@@ -10,14 +10,13 @@ import java.util.Arrays;
 import java.util.function.Function;
 import java.util.stream.Stream;
 
-import static de.juplo.kafka.wordcount.stats.StatsStreamProcessor.STATS_TYPE;
 import static org.assertj.core.api.Assertions.assertThat;
 
 
 class TestData
 {
-       static final TestUser PETER = TestUser.of(STATS_TYPE, "peter");
-       static final TestUser KLAUS = TestUser.of(STATS_TYPE, "klaus");
+       static final TestUser PETER = TestUser.of(StatisticsType.POPULAR.name(), "peter");
+       static final TestUser KLAUS = TestUser.of(StatisticsType.POPULAR.name(), "klaus");
 
        static final Stream<KeyValue<TestUser, TestRanking>> getTop10Messages()
        {