1 package de.juplo.kafka.wordcount.top10;
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import jakarta.annotation.PostConstruct;
6 import jakarta.annotation.PreDestroy;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.clients.consumer.ConsumerConfig;
9 import org.apache.kafka.common.serialization.Serdes;
10 import org.apache.kafka.streams.KafkaStreams;
11 import org.apache.kafka.streams.KeyValue;
12 import org.apache.kafka.streams.StreamsBuilder;
13 import org.apache.kafka.streams.StreamsConfig;
14 import org.springframework.boot.SpringApplication;
15 import org.springframework.context.ConfigurableApplicationContext;
16 import org.springframework.stereotype.Component;
18 import java.util.Properties;
19 import java.util.concurrent.CompletableFuture;
20 import java.util.regex.Pattern;
22 import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
27 public class Top10ApplicationConfiguration
29 final static Pattern PATTERN = Pattern.compile("\\W+");
31 public final KafkaStreams streams;
34 public Top10ApplicationConfiguration(
35 Top10ApplicationProperties properties,
37 ConfigurableApplicationContext context)
39 StreamsBuilder builder = new StreamsBuilder();
42 .<String, String>stream(properties.getInputTopic())
43 .map((keyJson, countStr) ->
47 Key key = mapper.readValue(keyJson, Key.class);
48 Long count = Long.parseLong(countStr);
49 Entry entry = Entry.of(key.getWord(), count);
50 String entryJson = mapper.writeValueAsString(entry);
51 return new KeyValue<>(key.getUsername(), entryJson);
53 catch (JsonProcessingException e)
55 throw new RuntimeException(e);
60 () -> "{\"entries\" : []}",
61 (username, entryJson, rankingJson) ->
65 Ranking ranking = mapper.readValue(rankingJson, Ranking.class);
66 ranking.add(mapper.readValue(entryJson, Entry.class));
67 return mapper.writeValueAsString(ranking);
69 catch (JsonProcessingException e)
71 throw new RuntimeException(e);
76 .to(properties.getOutputTopic());
78 Properties props = new Properties();
79 props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
80 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
81 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
82 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
83 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
85 streams = new KafkaStreams(builder.build(), props);
86 streams.setUncaughtExceptionHandler((Throwable e) ->
88 log.error("Unexpected error!", e);
89 CompletableFuture.runAsync(() ->
91 log.info("Stopping application...");
92 SpringApplication.exit(context, () -> 1);
94 return SHUTDOWN_CLIENT;
101 log.info("Starting Stream-Processor");
108 log.info("Stopping Stream-Processor");