X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fquery%2FQueryApplicationConfiguration.java;h=2ece744facc6e7ee3c8db840d213488e22f6bc78;hb=f18423d411650c6f08c9b698b92c33c42bdd670f;hp=7da171227a67dcd2472b6908435a2a8acb6bd976;hpb=278c7b8125c82120e1d80fa640bd16d375d4bf86;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java index 7da1712..2ece744 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java @@ -22,7 +22,8 @@ import java.net.Socket; import java.util.Properties; import java.util.concurrent.CompletableFuture; -import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STORE_NAME; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME; import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; @@ -69,6 +70,13 @@ public class QueryApplicationConfiguration props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, applicationProperties.getBootstrapServer()); + if (applicationProperties.getCommitInterval() != null) + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, applicationProperties.getCommitInterval()); + if (applicationProperties.getCacheMaxBytes() != null) + props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, applicationProperties.getCacheMaxBytes()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; @@ -80,7 +88,6 @@ public class QueryApplicationConfiguration props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, User.class.getName()); // << Does not work without this! props.put( JsonDeserializer.TYPE_MAPPINGS, "user:" + Key.class.getName() + "," + @@ -96,7 +103,8 @@ public class QueryApplicationConfiguration Properties streamProcessorProperties, HostInfo applicationServer, QueryApplicationProperties applicationProperties, - KeyValueBytesStoreSupplier storeSupplier, + KeyValueBytesStoreSupplier userStoreSupplier, + KeyValueBytesStoreSupplier rankingStoreSupplier, ConfigurableApplicationContext context) { QueryStreamProcessor streamProcessor = new QueryStreamProcessor( @@ -104,7 +112,8 @@ public class QueryApplicationConfiguration applicationServer, applicationProperties.getUsersInputTopic(), applicationProperties.getRankingInputTopic(), - storeSupplier); + userStoreSupplier, + rankingStoreSupplier); streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> { @@ -121,8 +130,14 @@ public class QueryApplicationConfiguration } @Bean - public KeyValueBytesStoreSupplier storeSupplier() + public KeyValueBytesStoreSupplier userStoreSupplier() + { + return Stores.persistentKeyValueStore(USER_STORE_NAME); + } + + @Bean + public KeyValueBytesStoreSupplier rankingStoreSupplier() { - return Stores.persistentKeyValueStore(STORE_NAME); + return Stores.persistentKeyValueStore(RANKING_STORE_NAME); } }