From: Kai Moritz Date: Mon, 17 Jun 2024 20:45:34 +0000 (+0200) Subject: popular: 1.1.0 - Implemented supression of intermediate results X-Git-Tag: popular-1.1.0 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=feb9d86212983d93d3546f5a4d9b15f14cd57314;p=demos%2Fkafka%2Fwordcount popular: 1.1.0 - Implemented supression of intermediate results --- diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java index 788ae20..9bcf0f7 100644 --- a/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java @@ -3,10 +3,7 @@ package de.juplo.kafka.wordcount.popular; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.kstream.Produced; -import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.*; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; @@ -72,6 +69,7 @@ public class PopularStreamProcessor .as(windowBytesStoreSupplier) .withKeySerde(new JsonSerde<>(Word.class).noTypeInfo()) .withValueSerde(Serdes.Long())) + .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) .toStream() .peek((windowedWord, counter) -> log.info("windowed: {} -> {}", windowedWord, counter)) .map((windowedWord, counter) -> new KeyValue<>(