query: 2.0.0 - Defined 2 state-stores (all state in-memory in tests)
authorKai Moritz <kai@juplo.de>
Wed, 12 Jun 2024 20:46:24 +0000 (22:46 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 13 Jun 2024 15:28:59 +0000 (17:28 +0200)
* Introduced a second state-store to store the incomming users-table.
* Without the explicit definition of the state-store, it is _not_ possible,
  to reconfigure the integration-test in such a way, taht it does not
  store its state locally on disk.

src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java
src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java
src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java
src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java

index 07b78e4..3bf8326 100644 (file)
@@ -22,7 +22,8 @@ import java.net.Socket;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME;
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME;
 import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
 
 
@@ -95,7 +96,8 @@ public class QueryApplicationConfiguration
                        Properties streamProcessorProperties,
                        HostInfo applicationServer,
                        QueryApplicationProperties applicationProperties,
-                       KeyValueBytesStoreSupplier storeSupplier,
+                       KeyValueBytesStoreSupplier userStoreSupplier,
+                       KeyValueBytesStoreSupplier rankingStoreSupplier,
                        ConfigurableApplicationContext context)
        {
                QueryStreamProcessor streamProcessor = new QueryStreamProcessor(
@@ -103,7 +105,8 @@ public class QueryApplicationConfiguration
                                applicationServer,
                                applicationProperties.getUsersInputTopic(),
                                applicationProperties.getRankingInputTopic(),
-                               storeSupplier);
+                               userStoreSupplier,
+                               rankingStoreSupplier);
 
                streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
                {
@@ -120,8 +123,14 @@ public class QueryApplicationConfiguration
        }
 
        @Bean
-       public KeyValueBytesStoreSupplier storeSupplier()
+       public KeyValueBytesStoreSupplier userStoreSupplier()
        {
-               return Stores.persistentKeyValueStore(STORE_NAME);
+               return Stores.persistentKeyValueStore(USER_STORE_NAME);
+       }
+
+       @Bean
+       public KeyValueBytesStoreSupplier rankingStoreSupplier()
+       {
+               return Stores.persistentKeyValueStore(RANKING_STORE_NAME);
        }
 }
index 7dacd4b..4749264 100644 (file)
@@ -22,7 +22,8 @@ import java.util.Properties;
 @Slf4j
 public class QueryStreamProcessor
 {
-       public static final String STORE_NAME = "rankings-by-username";
+       public static final String USER_STORE_NAME = "users";
+       public static final String RANKING_STORE_NAME = "rankings";
 
        public final KafkaStreams streams;
        public final HostInfo hostInfo;
@@ -34,27 +35,34 @@ public class QueryStreamProcessor
                        HostInfo applicationServer,
                        String usersInputTopic,
                        String rankingInputTopic,
-                       KeyValueBytesStoreSupplier storeSupplier)
+                       KeyValueBytesStoreSupplier userStoreSupplier,
+                       KeyValueBytesStoreSupplier rankingStoreSupplier)
        {
                Topology topology = buildTopology(
                                usersInputTopic,
                                rankingInputTopic,
-                               storeSupplier);
+                               userStoreSupplier,
+                               rankingStoreSupplier);
                streams = new KafkaStreams(topology, props);
                hostInfo = applicationServer;
-               storeParameters = StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore());;
+               storeParameters = StoreQueryParameters.fromNameAndType(RANKING_STORE_NAME, QueryableStoreTypes.keyValueStore());;
        }
 
        static Topology buildTopology(
                        String usersInputTopic,
                        String rankingInputTopic,
-                       KeyValueBytesStoreSupplier storeSupplier)
+                       KeyValueBytesStoreSupplier userStoreSupplier,
+                       KeyValueBytesStoreSupplier rankingStoreSupplier)
        {
                StreamsBuilder builder = new StreamsBuilder();
 
                KTable<String, User> users = builder
                                .stream(usersInputTopic)
-                               .toTable(Materialized.with(null, new JsonSerde().copyWithType(User.class)));
+                               .toTable(
+                                               Materialized
+                                                               .<String, User>as(userStoreSupplier)
+                                                               .withKeySerde(Serdes.String())
+                                                               .withValueSerde(new JsonSerde().copyWithType(User.class)));
                KStream<String, Ranking> rankings = builder.stream(rankingInputTopic);
 
                rankings
@@ -64,7 +72,7 @@ public class QueryStreamProcessor
                                                ranking.getEntries()))
                                .toTable(
                                                Materialized
-                                                               .<String, UserRanking>as(storeSupplier)
+                                                               .<String, UserRanking>as(rankingStoreSupplier)
                                                                .withValueSerde(new JsonSerde().copyWithType(UserRanking.class)));
 
                Topology topology = builder.build();
@@ -80,7 +88,7 @@ public class QueryStreamProcessor
 
        public Optional<URI> getRedirect(String username)
        {
-               KeyQueryMetadata metadata = streams.queryMetadataForKey(STORE_NAME, username, Serdes.String().serializer());
+               KeyQueryMetadata metadata = streams.queryMetadataForKey(RANKING_STORE_NAME, username, Serdes.String().serializer());
                HostInfo activeHost = metadata.activeHost();
                log.debug("Local store for {}: {}, {}:{}", username, metadata.partition(), activeHost.host(), activeHost.port());
 
index a9cca10..d800fbd 100644 (file)
@@ -12,7 +12,6 @@ import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMock
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Primary;
 import org.springframework.http.MediaType;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.support.SendResult;
@@ -24,13 +23,15 @@ import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
 
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME;
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME;
 import static org.awaitility.Awaitility.await;
 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
 
 
 @SpringBootTest(
                properties = {
+                               "spring.main.allow-bean-definition-overriding=true",
                                "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer",
                                "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
                                "spring.kafka.producer.properties.spring.json.type.mapping=userdata:de.juplo.kafka.wordcount.users.TestUserData,ranking:de.juplo.kafka.wordcount.top10.TestRanking",
@@ -129,11 +130,16 @@ public class QueryApplicationIT
        @TestConfiguration
        static class Configuration
        {
-               @Primary
                @Bean
-               KeyValueBytesStoreSupplier inMemoryStoreSupplier()
+               KeyValueBytesStoreSupplier userStoreSupplier()
                {
-                       return Stores.inMemoryKeyValueStore(STORE_NAME);
+                       return Stores.inMemoryKeyValueStore(USER_STORE_NAME);
+               }
+
+               @Bean
+               KeyValueBytesStoreSupplier rankingStoreSupplier()
+               {
+                       return Stores.inMemoryKeyValueStore(RANKING_STORE_NAME);
                }
        }
 }
index 845792c..1a857b7 100644 (file)
@@ -24,7 +24,8 @@ public class QueryStreamProcessorTopologyTest
 {
   public static final String TOP10_IN = "TOP10-IN";
   public static final String USERS_IN = "USERS-IN";
-  public static final String STORE_NAME = "TOPOLOGY-TEST";
+  public static final String RANKING_STORE_NAME = "TOPOLOGY-TEST-RANKINGS";
+  public static final String USERS_STORE_NAME = "TOPOLOGY-TEST-USERS";
 
 
   TopologyTestDriver testDriver;
@@ -38,7 +39,8 @@ public class QueryStreamProcessorTopologyTest
     Topology topology = QueryStreamProcessor.buildTopology(
         USERS_IN,
         TOP10_IN,
-        Stores.inMemoryKeyValueStore(STORE_NAME));
+        Stores.inMemoryKeyValueStore(USERS_STORE_NAME),
+        Stores.inMemoryKeyValueStore(RANKING_STORE_NAME));
 
     testDriver = new TopologyTestDriver(topology, serializationConfig());
 
@@ -64,7 +66,7 @@ public class QueryStreamProcessorTopologyTest
         .getTop10Messages()
         .forEach(kv -> top10In.pipeInput(kv.key, kv.value));
 
-    KeyValueStore<String, UserRanking> store = testDriver.getKeyValueStore(STORE_NAME);
+    KeyValueStore<String, UserRanking> store = testDriver.getKeyValueStore(RANKING_STORE_NAME);
     TestData.assertExpectedState(user -> store.get(user));
   }