X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fquery%2FQueryApplicationConfiguration.java;h=6c7844d823c88b549079639ea0d34d81d20fa703;hb=913c2c4ec0a3584f6be27c8341898d17c4260501;hp=7da171227a67dcd2472b6908435a2a8acb6bd976;hpb=278c7b8125c82120e1d80fa640bd16d375d4bf86;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java index 7da1712..6c7844d 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java @@ -22,7 +22,8 @@ import java.net.Socket; import java.util.Properties; import java.util.concurrent.CompletableFuture; -import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STORE_NAME; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME; import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; @@ -69,6 +70,13 @@ public class QueryApplicationConfiguration props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, applicationProperties.getBootstrapServer()); + if (applicationProperties.getCommitInterval() != null) + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, applicationProperties.getCommitInterval()); + if (applicationProperties.getCacheMaxBytes() != null) + props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, applicationProperties.getCacheMaxBytes()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; @@ -80,12 +88,10 @@ public class QueryApplicationConfiguration props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, User.class.getName()); // << Does not work without this! props.put( JsonDeserializer.TYPE_MAPPINGS, "user:" + Key.class.getName() + "," + "ranking:" + Ranking.class.getName() + "," + - "userdata:" + User.class.getName() + "," + "userranking:" + UserRanking.class.getName()); return props; @@ -96,7 +102,8 @@ public class QueryApplicationConfiguration Properties streamProcessorProperties, HostInfo applicationServer, QueryApplicationProperties applicationProperties, - KeyValueBytesStoreSupplier storeSupplier, + KeyValueBytesStoreSupplier userStoreSupplier, + KeyValueBytesStoreSupplier rankingStoreSupplier, ConfigurableApplicationContext context) { QueryStreamProcessor streamProcessor = new QueryStreamProcessor( @@ -104,7 +111,8 @@ public class QueryApplicationConfiguration applicationServer, applicationProperties.getUsersInputTopic(), applicationProperties.getRankingInputTopic(), - storeSupplier); + userStoreSupplier, + rankingStoreSupplier); streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> { @@ -121,8 +129,14 @@ public class QueryApplicationConfiguration } @Bean - public KeyValueBytesStoreSupplier storeSupplier() + public KeyValueBytesStoreSupplier userStoreSupplier() + { + return Stores.persistentKeyValueStore(USER_STORE_NAME); + } + + @Bean + public KeyValueBytesStoreSupplier rankingStoreSupplier() { - return Stores.persistentKeyValueStore(STORE_NAME); + return Stores.persistentKeyValueStore(RANKING_STORE_NAME); } }