WIP
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / top10 / Top10StreamProcessor.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import jakarta.annotation.PostConstruct;
6 import jakarta.annotation.PreDestroy;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.clients.consumer.ConsumerConfig;
9 import org.apache.kafka.common.serialization.Serdes;
10 import org.apache.kafka.streams.KafkaStreams;
11 import org.apache.kafka.streams.KeyValue;
12 import org.apache.kafka.streams.StreamsBuilder;
13 import org.apache.kafka.streams.StreamsConfig;
14 import org.springframework.boot.SpringApplication;
15 import org.springframework.context.ConfigurableApplicationContext;
16 import org.springframework.kafka.support.serializer.JsonDeserializer;
17 import org.springframework.kafka.support.serializer.JsonSerde;
18 import org.springframework.kafka.support.serializer.JsonSerializer;
19 import org.springframework.stereotype.Component;
20
21 import java.util.Properties;
22 import java.util.concurrent.CompletableFuture;
23 import java.util.regex.Pattern;
24
25 import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
26
27
28 @Slf4j
29 @Component
30 public class Top10StreamProcessor
31 {
32         final static Pattern PATTERN = Pattern.compile("\\W+");
33
34         public final KafkaStreams streams;
35
36
37         public Top10StreamProcessor(
38                         Top10ApplicationProperties properties,
39                         ObjectMapper mapper,
40                         ConfigurableApplicationContext context)
41         {
42                 StreamsBuilder builder = new StreamsBuilder();
43
44                 builder
45                                 .<String, String>stream(properties.getInputTopic())
46                                 .map((keyJson, countStr) ->
47                                 {
48                                         try
49                                         {
50                                                 Key key = mapper.readValue(keyJson, Key.class);
51                                                 Long count = Long.parseLong(countStr);
52                                                 Entry entry = Entry.of(key.getWord(), count);
53                                                 String entryJson = mapper.writeValueAsString(entry);
54                                                 return new KeyValue<>(key.getUsername(), entryJson);
55                                         }
56                                         catch (JsonProcessingException e)
57                                         {
58                                                 throw new RuntimeException(e);
59                                         }
60                                 })
61                                 .groupByKey()
62                                 .aggregate(
63                                                 () -> "{\"entries\"     : []}",
64                                                 (username, entryJson, rankingJson) ->
65                                                 {
66                                                         try
67                                                         {
68                                                                 Ranking ranking = mapper.readValue(rankingJson, Ranking.class);
69                                                                 ranking.add(mapper.readValue(entryJson, Entry.class));
70                                                                 return mapper.writeValueAsString(ranking);
71                                                         }
72                                                         catch (JsonProcessingException e)
73                                                         {
74                                                                 throw new RuntimeException(e);
75                                                         }
76                                                 }
77                                 )
78                                 .toStream()
79                                 .to(properties.getOutputTopic());
80
81                 Properties props = new Properties();
82                 props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
83                 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
84                 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
85                 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
86                 props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
87                 props.put(JsonDeserializer.TRUSTED_PACKAGES, Word.class.getPackageName());
88                 props.put(JsonDeserializer.KEY_DEFAULT_TYPE, Word.class.getName());
89                 props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, WordCount.class.getName());
90                 props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false);
91                 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
92
93                 streams = new KafkaStreams(builder.build(), props);
94                 streams.setUncaughtExceptionHandler((Throwable e) ->
95                 {
96                         log.error("Unexpected error!", e);
97                         CompletableFuture.runAsync(() ->
98                         {
99                                 log.info("Stopping application...");
100                                 SpringApplication.exit(context, () -> 1);
101                         });
102                         return SHUTDOWN_CLIENT;
103                 });
104         }
105
106         @PostConstruct
107         public void start()
108         {
109                 log.info("Starting Stream-Processor");
110                 streams.start();
111         }
112
113         @PreDestroy
114         public void stop()
115         {
116                 log.info("Stopping Stream-Processor");
117                 streams.close();
118         }
119 }