X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fcounter%2FCounterApplication.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fcounter%2FCounterApplication.java;h=e6d3b1fa5626878d157ce53bd074841b1de27a93;hb=e1ec32bcc70e5f990ca2ddb5566f98cee7e4c2c2;hp=31086ca9b5b0fffca94323e2d10892b24a1aee25;hpb=b8f6728e1957d3cfdd34efebd430e33797b67b58;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java index 31086ca..e6d3b1f 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java @@ -1,84 +1,12 @@ package de.juplo.kafka.wordcount.counter; -import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.Stores; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.context.annotation.Bean; - -import java.util.Properties; -import java.util.concurrent.CompletableFuture; - -import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; @SpringBootApplication -@EnableConfigurationProperties(CounterApplicationProperties.class) -@Slf4j public class CounterApplication { - @Bean - public Properties propertyMap(CounterApplicationProperties properties) - { - Properties propertyMap = new Properties(); - - propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); - propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); - propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); - propertyMap.put(StreamsConfig.STATE_DIR_CONFIG, "target"); - if (properties.getCommitInterval() != null) - propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval()); - if (properties.getCacheMaxBytes() != null) - propertyMap.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, properties.getCacheMaxBytes()); - propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - return propertyMap; - } - - @Bean(initMethod = "start", destroyMethod = "stop") - public CounterStreamProcessor streamProcessor( - CounterApplicationProperties properties, - Properties propertyMap, - KeyValueBytesStoreSupplier storeSupplier, - ObjectMapper objectMapper, - ConfigurableApplicationContext context) - { - CounterStreamProcessor streamProcessor = new CounterStreamProcessor( - properties.getInputTopic(), - properties.getOutputTopic(), - propertyMap, - storeSupplier, - objectMapper); - - streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> - { - log.error("Unexpected error!", e); - CompletableFuture.runAsync(() -> - { - log.info("Stopping application..."); - SpringApplication.exit(context, () -> 1); - }); - return SHUTDOWN_CLIENT; - }); - - - return streamProcessor; - } - - @Bean - public KeyValueBytesStoreSupplier storeSupplier() - { - return Stores.persistentKeyValueStore("counter"); - } - public static void main(String[] args) { SpringApplication.run(CounterApplication.class, args);