From: Kai Moritz Date: Tue, 14 May 2024 21:04:50 +0000 (+0200) Subject: top10: 1.0.3 - Separated config in `Top10ApplicationConfiguration` -- ALIGN X-Git-Tag: top10-1.0.3 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=e3fc3fc028270a7faef487af51128771459aed20;p=demos%2Fkafka%2Fwordcount top10: 1.0.3 - Separated config in `Top10ApplicationConfiguration` -- ALIGN --- diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java index 27dca95..5c14ae7 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java @@ -2,11 +2,9 @@ package de.juplo.kafka.wordcount.top10; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; @SpringBootApplication -@EnableConfigurationProperties(Top10ApplicationProperties.class) public class Top10Application { public static void main(String[] args) diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java index cdf268f..3ea85b8 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java @@ -1,89 +1,56 @@ package de.juplo.kafka.wordcount.top10; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.springframework.boot.SpringApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.stereotype.Component; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; import java.util.Properties; import java.util.concurrent.CompletableFuture; -import java.util.regex.Pattern; import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; +@Configuration +@EnableConfigurationProperties(Top10ApplicationProperties.class) @Slf4j -@Component public class Top10ApplicationConfiguration { - final static Pattern PATTERN = Pattern.compile("\\W+"); - - public final KafkaStreams streams; - - - public Top10ApplicationConfiguration( - Top10ApplicationProperties properties, - ObjectMapper mapper, - ConfigurableApplicationContext context) + @Bean + public Properties streamProcessorProperties(Top10ApplicationProperties properties) { - StreamsBuilder builder = new StreamsBuilder(); - - builder - .stream(properties.getInputTopic()) - .map((keyJson, countStr) -> - { - try - { - Key key = mapper.readValue(keyJson, Key.class); - Long count = Long.parseLong(countStr); - Entry entry = Entry.of(key.getWord(), count); - String entryJson = mapper.writeValueAsString(entry); - return new KeyValue<>(key.getUsername(), entryJson); - } - catch (JsonProcessingException e) - { - throw new RuntimeException(e); - } - }) - .groupByKey() - .aggregate( - () -> "{\"entries\" : []}", - (username, entryJson, rankingJson) -> - { - try - { - Ranking ranking = mapper.readValue(rankingJson, Ranking.class); - ranking.add(mapper.readValue(entryJson, Entry.class)); - return mapper.writeValueAsString(ranking); - } - catch (JsonProcessingException e) - { - throw new RuntimeException(e); - } - } - ) - .toStream() - .to(properties.getOutputTopic()); - Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streams = new KafkaStreams(builder.build(), props); - streams.setUncaughtExceptionHandler((Throwable e) -> + return props; + } + + @Bean(initMethod = "start", destroyMethod = "stop") + public Top10StreamProcessor streamProcessor( + Top10ApplicationProperties applicationProperties, + ObjectMapper objectMapper, + Properties streamProcessorProperties, + ConfigurableApplicationContext context) + { + Top10StreamProcessor streamProcessor = new Top10StreamProcessor( + applicationProperties.getInputTopic(), + applicationProperties.getOutputTopic(), + objectMapper, + streamProcessorProperties); + + streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> { log.error("Unexpected error!", e); CompletableFuture.runAsync(() -> @@ -93,19 +60,7 @@ public class Top10ApplicationConfiguration }); return SHUTDOWN_CLIENT; }); - } - @PostConstruct - public void start() - { - log.info("Starting Stream-Processor"); - streams.start(); - } - - @PreDestroy - public void stop() - { - log.info("Stopping Stream-Processor"); - streams.close(); + return streamProcessor; } } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index 2ae3ec5..f0a7d19 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -2,44 +2,30 @@ package de.juplo.kafka.wordcount.top10; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.springframework.boot.SpringApplication; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.stereotype.Component; import java.util.Properties; -import java.util.concurrent.CompletableFuture; -import java.util.regex.Pattern; - -import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; @Slf4j -@Component public class Top10StreamProcessor { - final static Pattern PATTERN = Pattern.compile("\\W+"); - public final KafkaStreams streams; public Top10StreamProcessor( - Top10ApplicationProperties properties, + String inputTopic, + String outputTopic, ObjectMapper mapper, - ConfigurableApplicationContext context) + Properties props) { StreamsBuilder builder = new StreamsBuilder(); builder - .stream(properties.getInputTopic()) + .stream(inputTopic) .map((keyJson, countStr) -> { try @@ -73,36 +59,17 @@ public class Top10StreamProcessor } ) .toStream() - .to(properties.getOutputTopic()); - - Properties props = new Properties(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + .to(outputTopic); streams = new KafkaStreams(builder.build(), props); - streams.setUncaughtExceptionHandler((Throwable e) -> - { - log.error("Unexpected error!", e); - CompletableFuture.runAsync(() -> - { - log.info("Stopping application..."); - SpringApplication.exit(context, () -> 1); - }); - return SHUTDOWN_CLIENT; - }); } - @PostConstruct public void start() { log.info("Starting Stream-Processor"); streams.start(); } - @PreDestroy public void stop() { log.info("Stopping Stream-Processor");