X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fcounter%2FCounterApplicationConfiguriation.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fcounter%2FCounterApplicationConfiguriation.java;h=f1da4078b24ac301072d8182975541cfed1fc21b;hb=e1ec32bcc70e5f990ca2ddb5566f98cee7e4c2c2;hp=0000000000000000000000000000000000000000;hpb=b8f6728e1957d3cfdd34efebd430e33797b67b58;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java new file mode 100644 index 0000000..f1da407 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java @@ -0,0 +1,81 @@ +package de.juplo.kafka.wordcount.counter; + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.Properties; +import java.util.concurrent.CompletableFuture; + +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; + + +@Configuration +@EnableConfigurationProperties(CounterApplicationProperties.class) +@Slf4j +public class CounterApplicationConfiguriation +{ + @Bean + public Properties propertyMap(CounterApplicationProperties properties) + { + Properties propertyMap = new Properties(); + + propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); + propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); + propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); + propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); + propertyMap.put(StreamsConfig.STATE_DIR_CONFIG, "target"); + if (properties.getCommitInterval() != null) + propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval()); + if (properties.getCacheMaxBytes() != null) + propertyMap.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, properties.getCacheMaxBytes()); + propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + return propertyMap; + } + + @Bean(initMethod = "start", destroyMethod = "stop") + public CounterStreamProcessor streamProcessor( + CounterApplicationProperties properties, + Properties propertyMap, + KeyValueBytesStoreSupplier storeSupplier, + ObjectMapper objectMapper, + ConfigurableApplicationContext context) + { + CounterStreamProcessor streamProcessor = new CounterStreamProcessor( + properties.getInputTopic(), + properties.getOutputTopic(), + propertyMap, + storeSupplier, + objectMapper); + + streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> + { + log.error("Unexpected error!", e); + CompletableFuture.runAsync(() -> + { + log.info("Stopping application..."); + SpringApplication.exit(context, () -> 1); + }); + return SHUTDOWN_CLIENT; + }); + + + return streamProcessor; + } + + @Bean + public KeyValueBytesStoreSupplier storeSupplier() + { + return Stores.persistentKeyValueStore("counter"); + } +}