1 package de.juplo.kafka.wordcount.popular;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.common.serialization.Serdes;
5 import org.apache.kafka.streams.*;
6 import org.apache.kafka.streams.kstream.Consumed;
7 import org.apache.kafka.streams.kstream.Materialized;
8 import org.apache.kafka.streams.kstream.Produced;
9 import org.apache.kafka.streams.kstream.TimeWindows;
10 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
11 import org.apache.kafka.streams.state.QueryableStoreTypes;
12 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
13 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
14 import org.springframework.kafka.support.serializer.JsonSerde;
15 import org.springframework.kafka.support.serializer.JsonSerializer;
17 import java.time.Duration;
18 import java.time.ZoneId;
19 import java.time.ZonedDateTime;
21 import java.util.Properties;
22 import java.util.stream.Collectors;
26 public class PopularStreamProcessor
28 public static final String KEY_VALUE_STORE_NAME = "popular";
29 public static final String WINDOW_STORE_NAME = "popular-windows";
30 public static final Duration WINDOW_SIZE = Duration.ofSeconds(30);
33 public final KafkaStreams streams;
36 public PopularStreamProcessor(
39 Properties properties,
41 WindowBytesStoreSupplier windowBytesStoreSupplier,
42 KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
44 Topology topology = PopularStreamProcessor.buildTopology(
48 windowBytesStoreSupplier,
49 keyValueBytesStoreSupplier);
51 streams = new KafkaStreams(topology, properties);
54 static Topology buildTopology(
58 WindowBytesStoreSupplier windowBytesStoreSupplier,
59 KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
61 StreamsBuilder builder = new StreamsBuilder();
64 .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
65 .map((key, userWord) -> new KeyValue<>(Word.of(userWord.getWord()), Word.of(userWord.getWord())))
67 .windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE))
70 .<Word, Long>as(windowBytesStoreSupplier)
71 .withKeySerde(new JsonSerde<>(Word.class).noTypeInfo())
72 .withValueSerde(Serdes.Long()))
74 .map((windowedWord, counter) -> new KeyValue<>(
76 ZonedDateTime.ofInstant(windowedWord.window().startTime(), zone),
77 ZonedDateTime.ofInstant(windowedWord.window().endTime(), zone),
78 windowedWord.key().getWord()),
79 WordCounter.of(windowedWord.key().getWord(), counter)))
82 .<WindowedWord, WordCounter>as(keyValueBytesStoreSupplier)
83 .withKeySerde(new JsonSerde<>(WindowedWord.class).noTypeInfo())
84 .withValueSerde(new JsonSerde<>(WordCounter.class).noTypeInfo()))
86 .to(outputTopic, Produced.with(outKeySerde(), outValueSerde()));
88 Topology topology = builder.build();
89 log.info("\n\n{}", topology.describe());
94 ReadOnlyKeyValueStore<WindowedWord, WordCounter> getStore()
96 return streams.store(StoreQueryParameters.fromNameAndType(KEY_VALUE_STORE_NAME, QueryableStoreTypes.keyValueStore()));
101 log.info("Starting Stream-Processor");
107 log.info("Stopping Stream-Processor");
113 public static JsonSerde<User> inKeySerde()
115 return new JsonSerde<>(User.class);
118 public static JsonSerde<Word> inValueSerde()
120 return new JsonSerde<>(Word.class);
123 public static JsonSerde<WindowedWord> outKeySerde()
128 public static JsonSerde<WordCounter> outValueSerde()
133 public static <T> JsonSerde<T> serde(boolean isKey)
135 JsonSerde<T> serde = new JsonSerde<>();
137 Map.of(JsonSerializer.TYPE_MAPPINGS, typeMappingsConfig()),
142 private static String typeMappingsConfig()
144 return typeMappingsConfig(WindowedWord.class, WordCounter.class);
147 public static String typeMappingsConfig(Class wordClass, Class wordCounterClass)
151 "counter", wordCounterClass)
154 .map(entry -> entry.getKey() + ":" + entry.getValue().getName())
155 .collect(Collectors.joining(","));