* Introduced a second state-store to store the incomming users-table.
* Without the explicit definition of the state-store, it is _not_ possible,
to reconfigure the integration-test in such a way, taht it does not
store its state locally on disk.
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME;
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME;
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
Properties streamProcessorProperties,
HostInfo applicationServer,
QueryApplicationProperties applicationProperties,
- KeyValueBytesStoreSupplier storeSupplier,
+ KeyValueBytesStoreSupplier userStoreSupplier,
+ KeyValueBytesStoreSupplier rankingStoreSupplier,
ConfigurableApplicationContext context)
{
QueryStreamProcessor streamProcessor = new QueryStreamProcessor(
applicationServer,
applicationProperties.getUsersInputTopic(),
applicationProperties.getRankingInputTopic(),
- storeSupplier);
+ userStoreSupplier,
+ rankingStoreSupplier);
streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
{
}
@Bean
- public KeyValueBytesStoreSupplier storeSupplier()
+ public KeyValueBytesStoreSupplier userStoreSupplier()
{
- return Stores.persistentKeyValueStore(STORE_NAME);
+ return Stores.persistentKeyValueStore(USER_STORE_NAME);
+ }
+
+ @Bean
+ public KeyValueBytesStoreSupplier rankingStoreSupplier()
+ {
+ return Stores.persistentKeyValueStore(RANKING_STORE_NAME);
}
}
@Slf4j
public class QueryStreamProcessor
{
- public static final String STORE_NAME = "rankings-by-username";
+ public static final String USER_STORE_NAME = "users";
+ public static final String RANKING_STORE_NAME = "rankings";
public final KafkaStreams streams;
public final HostInfo hostInfo;
HostInfo applicationServer,
String usersInputTopic,
String rankingInputTopic,
- KeyValueBytesStoreSupplier storeSupplier)
+ KeyValueBytesStoreSupplier userStoreSupplier,
+ KeyValueBytesStoreSupplier rankingStoreSupplier)
{
Topology topology = buildTopology(
usersInputTopic,
rankingInputTopic,
- storeSupplier);
+ userStoreSupplier,
+ rankingStoreSupplier);
streams = new KafkaStreams(topology, props);
hostInfo = applicationServer;
- storeParameters = StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore());;
+ storeParameters = StoreQueryParameters.fromNameAndType(RANKING_STORE_NAME, QueryableStoreTypes.keyValueStore());;
}
static Topology buildTopology(
String usersInputTopic,
String rankingInputTopic,
- KeyValueBytesStoreSupplier storeSupplier)
+ KeyValueBytesStoreSupplier userStoreSupplier,
+ KeyValueBytesStoreSupplier rankingStoreSupplier)
{
StreamsBuilder builder = new StreamsBuilder();
KTable<String, User> users = builder
.stream(usersInputTopic)
- .toTable(Materialized.with(null, new JsonSerde().copyWithType(User.class)));
+ .toTable(
+ Materialized
+ .<String, User>as(userStoreSupplier)
+ .withKeySerde(Serdes.String())
+ .withValueSerde(new JsonSerde().copyWithType(User.class)));
KStream<String, Ranking> rankings = builder.stream(rankingInputTopic);
rankings
ranking.getEntries()))
.toTable(
Materialized
- .<String, UserRanking>as(storeSupplier)
+ .<String, UserRanking>as(rankingStoreSupplier)
.withValueSerde(new JsonSerde().copyWithType(UserRanking.class)));
Topology topology = builder.build();
public Optional<URI> getRedirect(String username)
{
- KeyQueryMetadata metadata = streams.queryMetadataForKey(STORE_NAME, username, Serdes.String().serializer());
+ KeyQueryMetadata metadata = streams.queryMetadataForKey(RANKING_STORE_NAME, username, Serdes.String().serializer());
HostInfo activeHost = metadata.activeHost();
log.debug("Local store for {}: {}, {}:{}", username, metadata.partition(), activeHost.host(), activeHost.port());
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Primary;
import org.springframework.http.MediaType;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME;
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME;
import static org.awaitility.Awaitility.await;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@SpringBootTest(
properties = {
+ "spring.main.allow-bean-definition-overriding=true",
"spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer",
"spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
"spring.kafka.producer.properties.spring.json.type.mapping=userdata:de.juplo.kafka.wordcount.users.TestUserData,ranking:de.juplo.kafka.wordcount.top10.TestRanking",
@TestConfiguration
static class Configuration
{
- @Primary
@Bean
- KeyValueBytesStoreSupplier inMemoryStoreSupplier()
+ KeyValueBytesStoreSupplier userStoreSupplier()
{
- return Stores.inMemoryKeyValueStore(STORE_NAME);
+ return Stores.inMemoryKeyValueStore(USER_STORE_NAME);
+ }
+
+ @Bean
+ KeyValueBytesStoreSupplier rankingStoreSupplier()
+ {
+ return Stores.inMemoryKeyValueStore(RANKING_STORE_NAME);
}
}
}
{
public static final String TOP10_IN = "TOP10-IN";
public static final String USERS_IN = "USERS-IN";
- public static final String STORE_NAME = "TOPOLOGY-TEST";
+ public static final String RANKING_STORE_NAME = "TOPOLOGY-TEST-RANKINGS";
+ public static final String USERS_STORE_NAME = "TOPOLOGY-TEST-USERS";
TopologyTestDriver testDriver;
Topology topology = QueryStreamProcessor.buildTopology(
USERS_IN,
TOP10_IN,
- Stores.inMemoryKeyValueStore(STORE_NAME));
+ Stores.inMemoryKeyValueStore(USERS_STORE_NAME),
+ Stores.inMemoryKeyValueStore(RANKING_STORE_NAME));
testDriver = new TopologyTestDriver(topology, serializationConfig());
.getTop10Messages()
.forEach(kv -> top10In.pipeInput(kv.key, kv.value));
- KeyValueStore<String, UserRanking> store = testDriver.getKeyValueStore(STORE_NAME);
+ KeyValueStore<String, UserRanking> store = testDriver.getKeyValueStore(RANKING_STORE_NAME);
TestData.assertExpectedState(user -> store.get(user));
}