props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
props.put(
JsonDeserializer.TYPE_MAPPINGS,
"user:" + Key.class.getName() + "," +
props.put(
JsonDeserializer.TYPE_MAPPINGS,
"user:" + Key.class.getName() + "," +
Properties streamProcessorProperties,
HostInfo applicationServer,
QueryApplicationProperties applicationProperties,
Properties streamProcessorProperties,
HostInfo applicationServer,
QueryApplicationProperties applicationProperties,
- KeyValueBytesStoreSupplier storeSupplier,
+ KeyValueBytesStoreSupplier userStoreSupplier,
+ KeyValueBytesStoreSupplier rankingStoreSupplier,
ConfigurableApplicationContext context)
{
QueryStreamProcessor streamProcessor = new QueryStreamProcessor(
ConfigurableApplicationContext context)
{
QueryStreamProcessor streamProcessor = new QueryStreamProcessor(
applicationServer,
applicationProperties.getUsersInputTopic(),
applicationProperties.getRankingInputTopic(),
applicationServer,
applicationProperties.getUsersInputTopic(),
applicationProperties.getRankingInputTopic(),
- return Stores.persistentKeyValueStore(STORE_NAME);
+ return Stores.persistentKeyValueStore(USER_STORE_NAME);
+ }
+
+ @Bean
+ public KeyValueBytesStoreSupplier rankingStoreSupplier()
+ {
+ return Stores.persistentKeyValueStore(RANKING_STORE_NAME);