WIP popular-on-query
authorKai Moritz <kai@juplo.de>
Sun, 16 Jun 2024 09:57:03 +0000 (11:57 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 16 Jun 2024 09:57:03 +0000 (11:57 +0200)
src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguration.java
src/main/java/de/juplo/kafka/wordcount/popular/PopularController.java
src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java
src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java
src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java
src/test/java/de/juplo/kafka/wordcount/popular/TestData.java

index 1997d4e..e52d1c9 100644 (file)
@@ -6,6 +6,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.web.ServerProperties;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -18,6 +19,7 @@ import org.springframework.kafka.support.serializer.JsonSerde;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.time.Duration;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 
@@ -107,8 +109,7 @@ public class PopularApplicationConfiguration
                PopularStreamProcessor streamProcessor = new PopularStreamProcessor(
                                streamProcessorProperties,
                                applicationServer,
-                               applicationProperties.getUsersInputTopic(),
-                               applicationProperties.getRankingInputTopic(),
+                               applicationProperties.getInputTopic(),
                                userStoreSupplier,
                                rankingStoreSupplier);
 
@@ -127,13 +128,17 @@ public class PopularApplicationConfiguration
        }
 
        @Bean
-       public KeyValueBytesStoreSupplier userStoreSupplier()
+       public WindowBytesStoreSupplier windowedBytesStoreSupplier()
        {
-               return Stores.persistentKeyValueStore(USER_STORE_NAME);
+               return Stores.persistentWindowStore(
+                               USER_STORE_NAME,
+                               Duration.ofMinutes(1),
+                               Duration.ofMinutes(1),
+                               false);
        }
 
        @Bean
-       public KeyValueBytesStoreSupplier rankingStoreSupplier()
+       public KeyValueBytesStoreSupplier keyValueByptesStoreSupplier()
        {
                return Stores.persistentKeyValueStore(RANKING_STORE_NAME);
        }
index 1a3f120..8aa13b9 100644 (file)
@@ -5,7 +5,6 @@ import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.RestController;
 
 import java.net.URI;
@@ -18,8 +17,8 @@ public class PopularController
 {
   private final PopularStreamProcessor processor;
 
-  @GetMapping("{username}")
-  ResponseEntity<Ranking> queryFor(@PathVariable String username)
+  @GetMapping
+  ResponseEntity<Ranking> getRanking()
   {
     Optional<URI> redirect = processor.getRedirect(username);
     if (redirect.isPresent())
index 4538c72..12f2cb5 100644 (file)
@@ -61,8 +61,8 @@ public class PopularStreamProcessor
                                .emitStrategy(EmitStrategy.onWindowClose())
                                .aggregate(
                                                () -> new Ranking(),
-                                               (word, counter, ranking) -> ranking,
-                                               Materialized.<String, Ranking, WindowStore<Bytes, byte[]>>as(windowBytesStoreSupplier))
+                                               (word, counter, ranking) -> ranking) //,
+                               //              Materialized.<String, Ranking, WindowStore<Bytes, byte[]>>as(windowBytesStoreSupplier))
                                .toStream()
                                .map((windowedWord, ranking) ->
                                {
index bf172e0..e953817 100644 (file)
@@ -74,7 +74,7 @@ public class PopularApplicationIT
                                .getUsersMessages()
                                .forEach(kv -> flush(usersKafkaTemplate.send(TOPIC_USERS, kv.key, kv.value)));
                TestData
-                               .getTop10Messages()
+                               .getMessages()
                                .forEach(kv -> flush(top10KafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value)));
        }
 
index 32fb0bf..ddbdea0 100644 (file)
@@ -2,9 +2,9 @@ package de.juplo.kafka.wordcount.popular;
 
 import de.juplo.kafka.wordcount.counter.TestRanking;
 import de.juplo.kafka.wordcount.counter.TestWord;
+import de.juplo.kafka.wordcount.counter.TestWordCounter;
 import de.juplo.kafka.wordcount.users.TestUserData;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
@@ -15,6 +15,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.springframework.kafka.support.serializer.JsonSerializer;
 
+import java.time.Duration;
 import java.util.Map;
 
 import static de.juplo.kafka.wordcount.popular.PopularApplicationConfiguration.serializationConfig;
@@ -30,8 +31,7 @@ public class PopularStreamProcessorTopologyTest
 
 
   TopologyTestDriver testDriver;
-  TestInputTopic<TestWord, TestRanking> top10In;
-  TestInputTopic<String, TestUserData> userIn;
+  TestInputTopic<TestWord, TestWordCounter> in;
 
 
   @BeforeEach
@@ -39,21 +39,19 @@ public class PopularStreamProcessorTopologyTest
   {
     Topology topology = PopularStreamProcessor.buildTopology(
         USERS_IN,
-        TOP10_IN,
-        Stores.inMemoryKeyValueStore(USERS_STORE_NAME),
+        Stores.inMemoryWindowStore(
+            USERS_STORE_NAME,
+            Duration.ofSeconds(1),
+            Duration.ofSeconds(1),
+            false),
         Stores.inMemoryKeyValueStore(RANKING_STORE_NAME));
 
     testDriver = new TopologyTestDriver(topology, serializationConfig());
 
-    top10In = testDriver.createInputTopic(
+    in = testDriver.createInputTopic(
         TOP10_IN,
         jsonSerializer(TestWord.class, true),
-        jsonSerializer(TestRanking.class,false));
-
-    userIn = testDriver.createInputTopic(
-        USERS_IN,
-        new StringSerializer(),
-        jsonSerializer(TestUserData.class, false).noTypeInfo());
+        jsonSerializer(TestWordCounter.class,false));
   }
 
 
@@ -64,8 +62,8 @@ public class PopularStreamProcessorTopologyTest
         .getUsersMessages()
         .forEach(kv -> userIn.pipeInput(kv.key, kv.value));
     TestData
-        .getTop10Messages()
-        .forEach(kv -> top10In.pipeInput(kv.key, kv.value));
+        .getMessages()
+        .forEach(kv -> in.pipeInput(kv.key, kv.value));
 
     KeyValueStore<String, UserRanking> store = testDriver.getKeyValueStore(RANKING_STORE_NAME);
     TestData.assertExpectedState(user -> store.get(user));
index 2d95897..cce6055 100644 (file)
@@ -18,37 +18,32 @@ class TestData
        static final TestWord PETER = TestWord.of("peter");
        static final TestWord KLAUS = TestWord.of("klaus");
 
-       static final Stream<KeyValue<TestWord, TestWordCounter>> getTop10Messages()
+       static final Stream<KeyValue<TestWord, TestWordCounter>> getMessages()
        {
                return Stream.of(TOP10_MESSAGES);
        }
 
-       static final Stream<KeyValue<String, TestUserData>> getUsersMessages()
-       {
-               return Stream.of(USERS_MESSAGES);
-       }
-
-       static void assertExpectedState(Function<String, UserRanking> function)
+       static void assertExpectedState(Function<String, Ranking> function)
        {
                assertRankingEqualsRankingFromLastMessage(PETER.getUser(), function.apply(PETER.getUser()));
                assertRankingEqualsRankingFromLastMessage(KLAUS.getUser(), function.apply(KLAUS.getUser()));
        }
 
-       private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson)
+       private static void assertRankingEqualsRankingFromLastMessage(String user, Ranking rankingJson)
        {
                assertThat(rankingJson).isEqualTo(getLastMessageFor(user));
        }
 
-       private static UserRanking getLastMessageFor(String user)
+       private static Ranking getLastMessageFor(String user)
        {
-               return getTop10Messages()
+               return getMessages()
                                .filter(kv -> kv.key.getUser().equals(user))
                                .map(kv -> kv.value)
                                .map(testRanking -> userRankingFor(user, testRanking))
                                .reduce(null, (left, right) -> right);
        }
 
-       private static UserRanking userRankingFor(String user, TestRanking testRanking)
+       private static Ranking userRankingFor(String user, Ranking testRanking)
        {
                TestUserData testUserData = getUsersMessages()
                                .filter(kv -> kv.key.equals(user))