X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fcounter%2FCounterApplication.java;h=e6d3b1fa5626878d157ce53bd074841b1de27a93;hb=dd4d5ddb60f7ca495d65d49f93eb4c07e0d03e22;hp=5fc183ccebbb9b0ed685c9086b7d001c45e45a5e;hpb=db4b181f863add3e78c01d110c1b4feabd8ecb89;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java index 5fc183c..e6d3b1f 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java @@ -1,67 +1,12 @@ package de.juplo.kafka.wordcount.counter; -import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.StreamsConfig; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.context.annotation.Bean; - -import java.util.Properties; -import java.util.concurrent.CompletableFuture; - -import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; @SpringBootApplication -@EnableConfigurationProperties(CounterApplicationProperties.class) -@Slf4j public class CounterApplication { - @Bean(initMethod = "start", destroyMethod = "stop") - public CounterStreamProcessor streamProcessor( - CounterApplicationProperties properties, - ObjectMapper objectMapper, - ConfigurableApplicationContext context) - { - Properties propertyMap = new Properties(); - - propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); - propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); - propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); - propertyMap.put(StreamsConfig.STATE_DIR_CONFIG, "target"); - if (properties.getCommitInterval() != null) - propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval()); - if (properties.getCacheMaxBytes() != null) - propertyMap.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, properties.getCacheMaxBytes()); - propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - CounterStreamProcessor streamProcessor = new CounterStreamProcessor( - properties.getInputTopic(), - properties.getOutputTopic(), - propertyMap, - objectMapper); - - streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> - { - log.error("Unexpected error!", e); - CompletableFuture.runAsync(() -> - { - log.info("Stopping application..."); - SpringApplication.exit(context, () -> 1); - }); - return SHUTDOWN_CLIENT; - }); - - - return streamProcessor; - } - public static void main(String[] args) { SpringApplication.run(CounterApplication.class, args);