1 package de.juplo.kafka.wordcount.popular;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.common.serialization.Serdes;
5 import org.apache.kafka.streams.*;
6 import org.apache.kafka.streams.kstream.*;
7 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
8 import org.apache.kafka.streams.state.QueryableStoreTypes;
9 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
10 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
11 import org.springframework.kafka.support.serializer.JsonSerde;
12 import org.springframework.kafka.support.serializer.JsonSerializer;
14 import java.time.Duration;
16 import java.util.Properties;
17 import java.util.stream.Collectors;
21 public class PopularStreamProcessor
23 public static final String KEY_VALUE_STORE_NAME = "popular";
24 public static final String WINDOW_STORE_NAME = "popular-windows";
25 public static final Duration WINDOW_SIZE = Duration.ofSeconds(30);
28 public final KafkaStreams streams;
31 public PopularStreamProcessor(
34 Properties properties,
35 WindowBytesStoreSupplier windowBytesStoreSupplier,
36 KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
38 Topology topology = PopularStreamProcessor.buildTopology(
41 windowBytesStoreSupplier,
42 keyValueBytesStoreSupplier);
44 streams = new KafkaStreams(topology, properties);
47 static Topology buildTopology(
50 WindowBytesStoreSupplier windowBytesStoreSupplier,
51 KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
53 StreamsBuilder builder = new StreamsBuilder();
56 .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
57 .peek((user, userWord) -> log.info("{}: {} -> {}", inputTopic, user, userWord))
58 .map((key, userWord) -> new KeyValue<>(Word.of(userWord.getWord()), Word.of(userWord.getWord())))
59 .peek((key, value) -> log.info("mapped: {} -> {}", key, value))
61 .windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE))
64 .<Word, Long>as(windowBytesStoreSupplier)
65 .withKeySerde(new JsonSerde<>(Word.class).noTypeInfo())
66 .withValueSerde(Serdes.Long()))
67 .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
69 .peek((windowedWord, counter) -> log.info("windowed: {} -> {}", windowedWord, counter))
70 .map((windowedWord, counter) -> new KeyValue<>(
72 Long.toString(windowedWord.window().startTime().getEpochSecond()),
73 windowedWord.key().getWord()),
74 WordCounter.of(windowedWord.key().getWord(), counter)))
75 .peek((windowedWord, wordCounter) -> log.info("results: {} -> {}", windowedWord, wordCounter))
78 .<WindowedWord, WordCounter>as(keyValueBytesStoreSupplier)
79 .withKeySerde(new JsonSerde<>(WindowedWord.class).noTypeInfo())
80 .withValueSerde(new JsonSerde<>(WordCounter.class).noTypeInfo()))
82 .peek((windowedWord, wordCounter) -> log.info("output: {} -> {}", windowedWord, wordCounter))
83 .to(outputTopic, Produced.with(outKeySerde(), outValueSerde()));
85 Topology topology = builder.build();
86 log.info("\n\n{}", topology.describe());
91 ReadOnlyKeyValueStore<WindowedWord, WordCounter> getStore()
93 return streams.store(StoreQueryParameters.fromNameAndType(KEY_VALUE_STORE_NAME, QueryableStoreTypes.keyValueStore()));
98 log.info("Starting Stream-Processor");
104 log.info("Stopping Stream-Processor");
110 public static JsonSerde<User> inKeySerde()
112 return new JsonSerde<>(User.class);
115 public static JsonSerde<Word> inValueSerde()
117 return new JsonSerde<>(Word.class);
120 public static JsonSerde<WindowedWord> outKeySerde()
125 public static JsonSerde<WordCounter> outValueSerde()
130 public static <T> JsonSerde<T> serde(boolean isKey)
132 JsonSerde<T> serde = new JsonSerde<>();
134 Map.of(JsonSerializer.TYPE_MAPPINGS, typeMappingsConfig()),
139 private static String typeMappingsConfig()
141 return typeMappingsConfig(WindowedWord.class, WordCounter.class);
144 public static String typeMappingsConfig(Class wordClass, Class wordCounterClass)
148 "counter", wordCounterClass)
151 .map(entry -> entry.getKey() + ":" + entry.getValue().getName())
152 .collect(Collectors.joining(","));