1 package de.juplo.kafka.wordcount.popular;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.common.serialization.Serdes;
5 import org.apache.kafka.streams.*;
6 import org.apache.kafka.streams.kstream.*;
7 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
8 import org.apache.kafka.streams.state.QueryableStoreTypes;
9 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
10 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
11 import org.springframework.kafka.support.serializer.JsonSerde;
12 import org.springframework.kafka.support.serializer.JsonSerializer;
14 import java.time.Duration;
16 import java.util.Properties;
17 import java.util.stream.Collectors;
21 public class PopularStreamProcessor
23 public static final String TYPE = "POPULAR";
24 public static final String KEY_VALUE_STORE_NAME = "popular";
25 public static final String WINDOW_STORE_NAME = "popular-windows";
26 public static final Duration WINDOW_SIZE = Duration.ofSeconds(30);
29 public final KafkaStreams streams;
32 public PopularStreamProcessor(
35 Properties properties,
36 WindowBytesStoreSupplier windowBytesStoreSupplier,
37 KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
39 Topology topology = PopularStreamProcessor.buildTopology(
42 windowBytesStoreSupplier,
43 keyValueBytesStoreSupplier);
45 streams = new KafkaStreams(topology, properties);
48 static Topology buildTopology(
51 WindowBytesStoreSupplier windowBytesStoreSupplier,
52 KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
54 StreamsBuilder builder = new StreamsBuilder();
57 .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
58 .peek((user, userWord) -> log.info("{}: {} -> {}", inputTopic, user, userWord))
59 .map((key, userWord) -> new KeyValue<>(Word.of(userWord.getWord()), Word.of(userWord.getWord())))
60 .peek((key, value) -> log.info("mapped: {} -> {}", key, value))
62 .windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE))
65 .<Word, Long>as(windowBytesStoreSupplier)
66 .withKeySerde(new JsonSerde<>(Word.class).noTypeInfo())
67 .withValueSerde(Serdes.Long()))
68 .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
70 .peek((windowedWord, counter) -> log.info("windowed: {} -> {}", windowedWord, counter))
71 .map((windowedWord, counter) -> new KeyValue<>(
73 Long.toString(windowedWord.window().startTime().getEpochSecond()),
74 windowedWord.key().getWord()),
75 WordCounter.of(windowedWord.key().getWord(), counter)))
76 .peek((windowedWord, wordCounter) -> log.info("results: {} -> {}", windowedWord, wordCounter))
79 .<WindowedWord, WordCounter>as(keyValueBytesStoreSupplier)
80 .withKeySerde(new JsonSerde<>(WindowedWord.class).noTypeInfo())
81 .withValueSerde(new JsonSerde<>(WordCounter.class).noTypeInfo()))
83 .peek((windowedWord, wordCounter) -> log.info("output: {} -> {}", windowedWord, wordCounter))
84 .to(outputTopic, Produced.with(outKeySerde(), outValueSerde()));
86 Topology topology = builder.build();
87 log.info("\n\n{}", topology.describe());
92 ReadOnlyKeyValueStore<WindowedWord, WordCounter> getStore()
94 return streams.store(StoreQueryParameters.fromNameAndType(KEY_VALUE_STORE_NAME, QueryableStoreTypes.keyValueStore()));
99 log.info("Starting Stream-Processor");
105 log.info("Stopping Stream-Processor");
111 public static JsonSerde<User> inKeySerde()
113 return new JsonSerde<>(User.class);
116 public static JsonSerde<Word> inValueSerde()
118 return new JsonSerde<>(Word.class);
121 public static JsonSerde<WindowedWord> outKeySerde()
126 public static JsonSerde<WordCounter> outValueSerde()
131 public static <T> JsonSerde<T> serde(boolean isKey)
133 JsonSerde<T> serde = new JsonSerde<>();
135 Map.of(JsonSerializer.TYPE_MAPPINGS, typeMappingsConfig()),
140 private static String typeMappingsConfig()
142 return typeMappingsConfig(WindowedWord.class, WordCounter.class);
145 public static String typeMappingsConfig(Class keyClass, Class counterClass)
149 "counter", counterClass)
152 .map(entry -> entry.getKey() + ":" + entry.getValue().getName())
153 .collect(Collectors.joining(","));