X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fcounter%2FCounterApplicationConfiguriation.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fcounter%2FCounterApplicationConfiguriation.java;h=738bc989d4e554382bcf006a282ead9434caa368;hb=8dec693aef633b7b8097780f527039e697a1a9b9;hp=3878dbacf9f8901e8f9432cdb1231aa8b1d677bd;hpb=7efbc8c73e3f2bb1f553a61325b8576886bab254;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java index 3878dba..738bc98 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java @@ -31,11 +31,13 @@ public class CounterApplicationConfiguriation Properties propertyMap = serializationConfig(); propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, counterProperties.getApplicationId()); + propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, counterProperties.getBootstrapServer()); if (counterProperties.getCommitInterval() != null) propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, counterProperties.getCommitInterval()); if (counterProperties.getCacheMaxBytes() != null) - propertyMap.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, counterProperties.getCacheMaxBytes()); + propertyMap.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, counterProperties.getCacheMaxBytes()); + propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return propertyMap;