From fc5d6c6ee08a4b2e29a045bf4071dd0a4d86bc0d Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 12 Jun 2024 22:46:24 +0200 Subject: [PATCH] query: 2.0.0 - Defined 2 state-stores (all state in-memory in tests) * Introduced a second state-store to store the incomming users-table. * Without the explicit definition of the state-store, it is _not_ possible, to reconfigure the integration-test in such a way, taht it does not store its state locally on disk. --- .../query/QueryApplicationConfiguration.java | 19 +++++++++++---- .../wordcount/query/QueryStreamProcessor.java | 24 ++++++++++++------- .../wordcount/query/QueryApplicationIT.java | 16 +++++++++---- .../QueryStreamProcessorTopologyTest.java | 8 ++++--- 4 files changed, 46 insertions(+), 21 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java index 07b78e4..3bf8326 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java @@ -22,7 +22,8 @@ import java.net.Socket; import java.util.Properties; import java.util.concurrent.CompletableFuture; -import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STORE_NAME; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME; import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; @@ -95,7 +96,8 @@ public class QueryApplicationConfiguration Properties streamProcessorProperties, HostInfo applicationServer, QueryApplicationProperties applicationProperties, - KeyValueBytesStoreSupplier storeSupplier, + KeyValueBytesStoreSupplier userStoreSupplier, + KeyValueBytesStoreSupplier rankingStoreSupplier, ConfigurableApplicationContext context) { QueryStreamProcessor streamProcessor = new QueryStreamProcessor( @@ -103,7 +105,8 @@ public class QueryApplicationConfiguration applicationServer, applicationProperties.getUsersInputTopic(), applicationProperties.getRankingInputTopic(), - storeSupplier); + userStoreSupplier, + rankingStoreSupplier); streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> { @@ -120,8 +123,14 @@ public class QueryApplicationConfiguration } @Bean - public KeyValueBytesStoreSupplier storeSupplier() + public KeyValueBytesStoreSupplier userStoreSupplier() { - return Stores.persistentKeyValueStore(STORE_NAME); + return Stores.persistentKeyValueStore(USER_STORE_NAME); + } + + @Bean + public KeyValueBytesStoreSupplier rankingStoreSupplier() + { + return Stores.persistentKeyValueStore(RANKING_STORE_NAME); } } diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index 7dacd4b..4749264 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -22,7 +22,8 @@ import java.util.Properties; @Slf4j public class QueryStreamProcessor { - public static final String STORE_NAME = "rankings-by-username"; + public static final String USER_STORE_NAME = "users"; + public static final String RANKING_STORE_NAME = "rankings"; public final KafkaStreams streams; public final HostInfo hostInfo; @@ -34,27 +35,34 @@ public class QueryStreamProcessor HostInfo applicationServer, String usersInputTopic, String rankingInputTopic, - KeyValueBytesStoreSupplier storeSupplier) + KeyValueBytesStoreSupplier userStoreSupplier, + KeyValueBytesStoreSupplier rankingStoreSupplier) { Topology topology = buildTopology( usersInputTopic, rankingInputTopic, - storeSupplier); + userStoreSupplier, + rankingStoreSupplier); streams = new KafkaStreams(topology, props); hostInfo = applicationServer; - storeParameters = StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore());; + storeParameters = StoreQueryParameters.fromNameAndType(RANKING_STORE_NAME, QueryableStoreTypes.keyValueStore());; } static Topology buildTopology( String usersInputTopic, String rankingInputTopic, - KeyValueBytesStoreSupplier storeSupplier) + KeyValueBytesStoreSupplier userStoreSupplier, + KeyValueBytesStoreSupplier rankingStoreSupplier) { StreamsBuilder builder = new StreamsBuilder(); KTable users = builder .stream(usersInputTopic) - .toTable(Materialized.with(null, new JsonSerde().copyWithType(User.class))); + .toTable( + Materialized + .as(userStoreSupplier) + .withKeySerde(Serdes.String()) + .withValueSerde(new JsonSerde().copyWithType(User.class))); KStream rankings = builder.stream(rankingInputTopic); rankings @@ -64,7 +72,7 @@ public class QueryStreamProcessor ranking.getEntries())) .toTable( Materialized - .as(storeSupplier) + .as(rankingStoreSupplier) .withValueSerde(new JsonSerde().copyWithType(UserRanking.class))); Topology topology = builder.build(); @@ -80,7 +88,7 @@ public class QueryStreamProcessor public Optional getRedirect(String username) { - KeyQueryMetadata metadata = streams.queryMetadataForKey(STORE_NAME, username, Serdes.String().serializer()); + KeyQueryMetadata metadata = streams.queryMetadataForKey(RANKING_STORE_NAME, username, Serdes.String().serializer()); HostInfo activeHost = metadata.activeHost(); log.debug("Local store for {}: {}, {}:{}", username, metadata.partition(), activeHost.host(), activeHost.port()); diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java index a9cca10..d800fbd 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java @@ -12,7 +12,6 @@ import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMock import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Primary; import org.springframework.http.MediaType; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; @@ -24,13 +23,15 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.concurrent.CompletableFuture; -import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STORE_NAME; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME; import static org.awaitility.Awaitility.await; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @SpringBootTest( properties = { + "spring.main.allow-bean-definition-overriding=true", "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer", "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", "spring.kafka.producer.properties.spring.json.type.mapping=userdata:de.juplo.kafka.wordcount.users.TestUserData,ranking:de.juplo.kafka.wordcount.top10.TestRanking", @@ -129,11 +130,16 @@ public class QueryApplicationIT @TestConfiguration static class Configuration { - @Primary @Bean - KeyValueBytesStoreSupplier inMemoryStoreSupplier() + KeyValueBytesStoreSupplier userStoreSupplier() { - return Stores.inMemoryKeyValueStore(STORE_NAME); + return Stores.inMemoryKeyValueStore(USER_STORE_NAME); + } + + @Bean + KeyValueBytesStoreSupplier rankingStoreSupplier() + { + return Stores.inMemoryKeyValueStore(RANKING_STORE_NAME); } } } diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java index 845792c..1a857b7 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java @@ -24,7 +24,8 @@ public class QueryStreamProcessorTopologyTest { public static final String TOP10_IN = "TOP10-IN"; public static final String USERS_IN = "USERS-IN"; - public static final String STORE_NAME = "TOPOLOGY-TEST"; + public static final String RANKING_STORE_NAME = "TOPOLOGY-TEST-RANKINGS"; + public static final String USERS_STORE_NAME = "TOPOLOGY-TEST-USERS"; TopologyTestDriver testDriver; @@ -38,7 +39,8 @@ public class QueryStreamProcessorTopologyTest Topology topology = QueryStreamProcessor.buildTopology( USERS_IN, TOP10_IN, - Stores.inMemoryKeyValueStore(STORE_NAME)); + Stores.inMemoryKeyValueStore(USERS_STORE_NAME), + Stores.inMemoryKeyValueStore(RANKING_STORE_NAME)); testDriver = new TopologyTestDriver(topology, serializationConfig()); @@ -64,7 +66,7 @@ public class QueryStreamProcessorTopologyTest .getTop10Messages() .forEach(kv -> top10In.pipeInput(kv.key, kv.value)); - KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); + KeyValueStore store = testDriver.getKeyValueStore(RANKING_STORE_NAME); TestData.assertExpectedState(user -> store.get(user)); } -- 2.20.1