From 95e8eba2a7f24bd13fac9dfb6165e54b35dab480 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 8 Jul 2024 16:33:34 +0200 Subject: [PATCH] Activated supression of intermediate results and adapted expected result --- .../counter/AggregationTopologyTest.java | 41 +++++++++++-------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java index 85fb265..a650311 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java @@ -2,12 +2,11 @@ package de.juplo.kafka.wordcount.counter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.TimeWindows; -import org.apache.kafka.streams.kstream.Window; -import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.*; import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.test.TestRecord; import org.junit.jupiter.api.Test; @@ -38,7 +37,12 @@ public class AggregationTopologyTest input .groupByKey() .windowedBy(WINDOWS) - .reduce((aggregate, value) -> aggregate + "-" + value) + .reduce( + (aggregate, value) -> aggregate + "-" + value, + Materialized.>as("aggregated-store") + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String())) + .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) .toStream((k,v) -> k.toString()) .to(OUTPUT); @@ -62,40 +66,43 @@ public class AggregationTopologyTest sendAt("A", 63); - assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A")); + assertThatOutcomeIs(); sendAt("B", 64); - assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B")); + assertThatOutcomeIs(); sendAt("C", 65); - assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C")); + assertThatOutcomeIs(); sendAt("D", 66); - assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C-D")); + assertThatOutcomeIs(); sendAt("E", 69); - assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C-D-E")); + assertThatOutcomeIs(); sendAt("F", 70); - assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F")); + assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C-D-E")); sendAt("G", 74); - assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F-G")); + assertThatOutcomeIs(); sendAt("H", 75); - assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F-G-H")); + assertThatOutcomeIs(); sendAt("I", 100); - assertThatOutcomeIs(KeyValue.pair(windowFor(100), "I")); + assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F-G-H")); sendAt("J", 120); - assertThatOutcomeIs(KeyValue.pair(windowFor(120), "J")); + assertThatOutcomeIs(KeyValue.pair(windowFor(100), "I")); sendAt("K", 140); - assertThatOutcomeIs(KeyValue.pair(windowFor(140), "K")); + assertThatOutcomeIs(KeyValue.pair(windowFor(120), "J")); sendAt("L", 160); - assertThatOutcomeIs(KeyValue.pair(windowFor(160), "L")); + assertThatOutcomeIs(KeyValue.pair(windowFor(140), "K")); + + // Never received, if no newer message is send + // KeyValue.pair(windowFor(160), "L") } -- 2.20.1