import java.util.Properties;
import java.util.concurrent.CompletableFuture;
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME;
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME;
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
Properties streamProcessorProperties,
HostInfo applicationServer,
QueryApplicationProperties applicationProperties,
- KeyValueBytesStoreSupplier storeSupplier,
+ KeyValueBytesStoreSupplier userStoreSupplier,
+ KeyValueBytesStoreSupplier rankingStoreSupplier,
ConfigurableApplicationContext context)
{
QueryStreamProcessor streamProcessor = new QueryStreamProcessor(
applicationServer,
applicationProperties.getUsersInputTopic(),
applicationProperties.getRankingInputTopic(),
- storeSupplier);
+ userStoreSupplier,
+ rankingStoreSupplier);
streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
{
}
@Bean
- public KeyValueBytesStoreSupplier storeSupplier()
+ public KeyValueBytesStoreSupplier userStoreSupplier()
{
- return Stores.persistentKeyValueStore(STORE_NAME);
+ return Stores.persistentKeyValueStore(USER_STORE_NAME);
+ }
+
+ @Bean
+ public KeyValueBytesStoreSupplier rankingStoreSupplier()
+ {
+ return Stores.persistentKeyValueStore(RANKING_STORE_NAME);
}
}