From: Kai Moritz Date: Mon, 8 Jul 2024 14:33:34 +0000 (+0200) Subject: Activated supression of intermediate results and adapted expected result X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=95e8eba2a7f24bd13fac9dfb6165e54b35dab480;p=demos%2Fkafka%2Fwordcount Activated supression of intermediate results and adapted expected result --- diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java index 85fb265..a650311 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java @@ -2,12 +2,11 @@ package de.juplo.kafka.wordcount.counter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.TimeWindows; -import org.apache.kafka.streams.kstream.Window; -import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.*; import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.test.TestRecord; import org.junit.jupiter.api.Test; @@ -38,7 +37,12 @@ public class AggregationTopologyTest input .groupByKey() .windowedBy(WINDOWS) - .reduce((aggregate, value) -> aggregate + "-" + value) + .reduce( + (aggregate, value) -> aggregate + "-" + value, + Materialized.>as("aggregated-store") + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String())) + .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) .toStream((k,v) -> k.toString()) .to(OUTPUT); @@ -62,40 +66,43 @@ public class AggregationTopologyTest sendAt("A", 63); - assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A")); + assertThatOutcomeIs(); sendAt("B", 64); - assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B")); + assertThatOutcomeIs(); sendAt("C", 65); - assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C")); + assertThatOutcomeIs(); sendAt("D", 66); - assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C-D")); + assertThatOutcomeIs(); sendAt("E", 69); - assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C-D-E")); + assertThatOutcomeIs(); sendAt("F", 70); - assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F")); + assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C-D-E")); sendAt("G", 74); - assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F-G")); + assertThatOutcomeIs(); sendAt("H", 75); - assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F-G-H")); + assertThatOutcomeIs(); sendAt("I", 100); - assertThatOutcomeIs(KeyValue.pair(windowFor(100), "I")); + assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F-G-H")); sendAt("J", 120); - assertThatOutcomeIs(KeyValue.pair(windowFor(120), "J")); + assertThatOutcomeIs(KeyValue.pair(windowFor(100), "I")); sendAt("K", 140); - assertThatOutcomeIs(KeyValue.pair(windowFor(140), "K")); + assertThatOutcomeIs(KeyValue.pair(windowFor(120), "J")); sendAt("L", 160); - assertThatOutcomeIs(KeyValue.pair(windowFor(160), "L")); + assertThatOutcomeIs(KeyValue.pair(windowFor(140), "K")); + + // Never received, if no newer message is send + // KeyValue.pair(windowFor(160), "L") }