From feb9d86212983d93d3546f5a4d9b15f14cd57314 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 17 Jun 2024 22:45:34 +0200 Subject: [PATCH] popular: 1.1.0 - Implemented supression of intermediate results --- .../kafka/wordcount/popular/PopularStreamProcessor.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java index 788ae20..9bcf0f7 100644 --- a/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java @@ -3,10 +3,7 @@ package de.juplo.kafka.wordcount.popular; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.kstream.Produced; -import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.*; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; @@ -72,6 +69,7 @@ public class PopularStreamProcessor .as(windowBytesStoreSupplier) .withKeySerde(new JsonSerde<>(Word.class).noTypeInfo()) .withValueSerde(Serdes.Long())) + .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) .toStream() .peek((windowedWord, counter) -> log.info("windowed: {} -> {}", windowedWord, counter)) .map((windowedWord, counter) -> new KeyValue<>( -- 2.20.1