X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fcounter%2FCounterApplicationConfiguriation.java;h=738bc989d4e554382bcf006a282ead9434caa368;hb=8dec693aef633b7b8097780f527039e697a1a9b9;hp=e38a0f88dac159bd2ed261f45ce2bf08cf387667;hpb=3a0dbf30eaee8aade255360217d7ab6049a95041;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java index e38a0f8..738bc98 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java @@ -31,11 +31,13 @@ public class CounterApplicationConfiguriation Properties propertyMap = serializationConfig(); propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, counterProperties.getApplicationId()); + propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, counterProperties.getBootstrapServer()); if (counterProperties.getCommitInterval() != null) propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, counterProperties.getCommitInterval()); if (counterProperties.getCacheMaxBytes() != null) - propertyMap.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, counterProperties.getCacheMaxBytes()); + propertyMap.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, counterProperties.getCacheMaxBytes()); + propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return propertyMap; @@ -53,7 +55,6 @@ public class CounterApplicationConfiguriation JsonDeserializer.TYPE_MAPPINGS, "word:" + Word.class.getName() + "," + "counter:" + WordCounter.class.getName()); - propertyMap.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, Boolean.FALSE); return propertyMap; }