X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fcounter%2FCounterApplication.java;h=31086ca9b5b0fffca94323e2d10892b24a1aee25;hb=b8f6728e1957d3cfdd34efebd430e33797b67b58;hp=1ee65ca16df5703d9c628e23993bd904647d24e1;hpb=9856b2a34d6e2bda771b83d2825c1db8e2a6916f;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java index 1ee65ca..31086ca 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java @@ -5,6 +5,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -22,11 +24,8 @@ import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.St @Slf4j public class CounterApplication { - @Bean(initMethod = "start", destroyMethod = "stop") - public CounterStreamProcessor streamProcessor( - CounterApplicationProperties properties, - ObjectMapper objectMapper, - ConfigurableApplicationContext context) + @Bean + public Properties propertyMap(CounterApplicationProperties properties) { Properties propertyMap = new Properties(); @@ -34,12 +33,29 @@ public class CounterApplication propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); + propertyMap.put(StreamsConfig.STATE_DIR_CONFIG, "target"); + if (properties.getCommitInterval() != null) + propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval()); + if (properties.getCacheMaxBytes() != null) + propertyMap.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, properties.getCacheMaxBytes()); propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return propertyMap; + } + + @Bean(initMethod = "start", destroyMethod = "stop") + public CounterStreamProcessor streamProcessor( + CounterApplicationProperties properties, + Properties propertyMap, + KeyValueBytesStoreSupplier storeSupplier, + ObjectMapper objectMapper, + ConfigurableApplicationContext context) + { CounterStreamProcessor streamProcessor = new CounterStreamProcessor( properties.getInputTopic(), properties.getOutputTopic(), propertyMap, + storeSupplier, objectMapper); streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> @@ -57,6 +73,12 @@ public class CounterApplication return streamProcessor; } + @Bean + public KeyValueBytesStoreSupplier storeSupplier() + { + return Stores.persistentKeyValueStore("counter"); + } + public static void main(String[] args) { SpringApplication.run(CounterApplication.class, args);