X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fcounter%2FCounterApplication.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fcounter%2FCounterApplication.java;h=5fc183ccebbb9b0ed685c9086b7d001c45e45a5e;hb=db4b181f863add3e78c01d110c1b4feabd8ecb89;hp=6a6a78cf943897e80c90dfcfa5e5c704358e1fd7;hpb=8a56919f493faa0acc6eb939b9c78312a1418efa;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java index 6a6a78c..5fc183c 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java @@ -35,8 +35,10 @@ public class CounterApplication propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); propertyMap.put(StreamsConfig.STATE_DIR_CONFIG, "target"); - propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0); - propertyMap.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + if (properties.getCommitInterval() != null) + propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval()); + if (properties.getCacheMaxBytes() != null) + propertyMap.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, properties.getCacheMaxBytes()); propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); CounterStreamProcessor streamProcessor = new CounterStreamProcessor(