package de.juplo.kafka.wordcount.popular;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.kafka.support.serializer.JsonSerializer;
+import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
@Slf4j
public class PopularStreamProcessor
{
- public static final String STORE_NAME = "popular";
+ public static final String KEY_VALUE_STORE_NAME = "popular";
+ public static final String WINDOW_STORE_NAME = "popular-windows";
+ public static final Duration WINDOW_SIZE = Duration.ofSeconds(30);
public final KafkaStreams streams;
String inputTopic,
String outputTopic,
Properties properties,
- KeyValueBytesStoreSupplier storeSupplier)
+ WindowBytesStoreSupplier windowBytesStoreSupplier,
+ KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
{
Topology topology = PopularStreamProcessor.buildTopology(
inputTopic,
outputTopic,
- storeSupplier);
+ windowBytesStoreSupplier,
+ keyValueBytesStoreSupplier);
streams = new KafkaStreams(topology, properties);
}
static Topology buildTopology(
String inputTopic,
String outputTopic,
- KeyValueBytesStoreSupplier storeSupplier)
+ WindowBytesStoreSupplier windowBytesStoreSupplier,
+ KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
{
StreamsBuilder builder = new StreamsBuilder();
builder
.stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
- .map((key, word) -> new KeyValue<>(word, word))
+ .peek((user, userWord) -> log.info("{}: {} -> {}", inputTopic, user, userWord))
+ .map((key, userWord) -> new KeyValue<>(Word.of(userWord.getWord()), Word.of(userWord.getWord())))
+ .peek((key, value) -> log.info("mapped: {} -> {}", key, value))
.groupByKey()
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE))
.count(
Materialized
- .<Word, Long>as(storeSupplier)
- .withKeySerde(new JsonSerde<>(Word.class))) // No headers are present: fixed typing is needed!
+ .<Word, Long>as(windowBytesStoreSupplier)
+ .withKeySerde(new JsonSerde<>(Word.class).noTypeInfo())
+ .withValueSerde(Serdes.Long()))
+ .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
- .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter)))
+ .peek((windowedWord, counter) -> log.info("windowed: {} -> {}", windowedWord, counter))
+ .map((windowedWord, counter) -> new KeyValue<>(
+ WindowedWord.of(
+ Long.toString(windowedWord.window().startTime().getEpochSecond()),
+ windowedWord.key().getWord()),
+ WordCounter.of(windowedWord.key().getWord(), counter)))
+ .peek((windowedWord, wordCounter) -> log.info("results: {} -> {}", windowedWord, wordCounter))
+ .toTable(
+ Materialized
+ .<WindowedWord, WordCounter>as(keyValueBytesStoreSupplier)
+ .withKeySerde(new JsonSerde<>(WindowedWord.class).noTypeInfo())
+ .withValueSerde(new JsonSerde<>(WordCounter.class).noTypeInfo()))
+ .toStream()
+ .peek((windowedWord, wordCounter) -> log.info("output: {} -> {}", windowedWord, wordCounter))
.to(outputTopic, Produced.with(outKeySerde(), outValueSerde()));
Topology topology = builder.build();
return topology;
}
- ReadOnlyKeyValueStore<Word, Long> getStore()
+ ReadOnlyKeyValueStore<WindowedWord, WordCounter> getStore()
{
- return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
+ return streams.store(StoreQueryParameters.fromNameAndType(KEY_VALUE_STORE_NAME, QueryableStoreTypes.keyValueStore()));
}
public void start()
return new JsonSerde<>(Word.class);
}
- public static JsonSerde<Word> outKeySerde()
+ public static JsonSerde<WindowedWord> outKeySerde()
{
return serde(true);
}
private static String typeMappingsConfig()
{
- return typeMappingsConfig(Word.class, WordCounter.class);
+ return typeMappingsConfig(WindowedWord.class, WordCounter.class);
}
public static String typeMappingsConfig(Class wordClass, Class wordCounterClass)