X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fcounter%2FCounterStreamProcessor.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fcounter%2FCounterStreamProcessor.java;h=0000000000000000000000000000000000000000;hb=4b94d31fbd663cb277276def106be9873ec4a246;hp=e8d7c116ffb8550a257955c825d878f7a6c4cb2b;hpb=02ab54c6b3c099f5b0bd420fc0a37034badf1c71;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java deleted file mode 100644 index e8d7c11..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ /dev/null @@ -1,97 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.KStream; -import org.springframework.boot.SpringApplication; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.stereotype.Component; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import java.util.Arrays; -import java.util.Properties; -import java.util.concurrent.CompletableFuture; -import java.util.regex.Pattern; - -import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; - - -@Slf4j -@Component -public class CounterStreamProcessor -{ - final static Pattern PATTERN = Pattern.compile("\\W+"); - - public final KafkaStreams streams; - - - public CounterStreamProcessor( - CounterApplicationProperties properties, - ObjectMapper mapper, - ConfigurableApplicationContext context) - { - StreamsBuilder builder = new StreamsBuilder(); - - KStream source = builder.stream(properties.getInputTopic()); - source - .flatMapValues(sentence -> Arrays.asList(PATTERN.split(sentence))) - .map((username, word) -> - { - try - { - String key = mapper.writeValueAsString(Key.of(username, word)); - return new KeyValue<>(key, word); - } - catch (JsonProcessingException e) - { - throw new RuntimeException(e); - } - }) - .groupByKey() - .count() - .mapValues(value->Long.toString(value)) - .toStream() - .to(properties.getOutputTopic()); - - Properties props = new Properties(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - streams = new KafkaStreams(builder.build(), props); - streams.setUncaughtExceptionHandler((Throwable e) -> - { - log.error("Unexpected error!", e); - CompletableFuture.runAsync(() -> - { - log.info("Stopping application..."); - SpringApplication.exit(context, () -> 1); - }); - return SHUTDOWN_CLIENT; - }); - } - - @PostConstruct - public void start() - { - log.info("Starting Stream-Processor"); - streams.start(); - } - - @PreDestroy - public void stop() - { - log.info("Stopping Stream-Processor"); - streams.close(); - } -}