From 11eeba55d369e14e0bef1f6f32a6efa4a570ef05 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 8 Jul 2024 21:31:16 +0200 Subject: [PATCH] Activated supression of intermediate results and adapted expected result WIP:suppressed --- .../counter/AggregationTopologyTest.java | 58 ++++++++----------- 1 file changed, 24 insertions(+), 34 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java index 80e0f18..8b64042 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java @@ -2,12 +2,11 @@ package de.juplo.kafka.wordcount.counter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.SessionWindows; -import org.apache.kafka.streams.kstream.Window; -import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.*; import org.apache.kafka.streams.kstream.internals.SessionWindow; +import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.test.TestRecord; import org.junit.jupiter.api.Test; @@ -38,7 +37,12 @@ public class AggregationTopologyTest input .groupByKey() .windowedBy(WINDOWS) - .reduce((aggregate, value) -> aggregate + "-" + value) + .reduce( + (aggregate, value) -> aggregate + "-" + value, + Materialized.>as("aggregated-store") + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String())) + .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) .toStream((k,v) -> k.toString()) .to(OUTPUT); @@ -62,57 +66,43 @@ public class AggregationTopologyTest sendAt("A", 63); - assertThatOutcomeIs( - KeyValue.pair(windowFor(63, 63), "A")); + assertThatOutcomeIs(); sendAt("B", 64); - assertThatOutcomeIs( - KeyValue.pair(windowFor(63, 63), null), - KeyValue.pair(windowFor(63, 64), "A-B")); + assertThatOutcomeIs(); sendAt("C", 65); - assertThatOutcomeIs( - KeyValue.pair(windowFor(63, 64), null), - KeyValue.pair(windowFor(63, 65), "A-B-C")); + assertThatOutcomeIs(); sendAt("D", 66); - assertThatOutcomeIs( - KeyValue.pair(windowFor(63, 65), null), - KeyValue.pair(windowFor(63, 66), "A-B-C-D")); + assertThatOutcomeIs(); sendAt("E", 69); - assertThatOutcomeIs( - KeyValue.pair(windowFor(69, 69), "E")); + assertThatOutcomeIs(KeyValue.pair(windowFor(63, 66), "A-B-C-D")); sendAt("F", 70); - assertThatOutcomeIs( - KeyValue.pair(windowFor(69, 69), null), - KeyValue.pair(windowFor(69, 70), "E-F")); + assertThatOutcomeIs(); sendAt("G", 74); - assertThatOutcomeIs( - KeyValue.pair(windowFor(74, 74), "G")); + assertThatOutcomeIs(KeyValue.pair(windowFor(69, 70), "E-F")); sendAt("H", 75); - assertThatOutcomeIs( - KeyValue.pair(windowFor(74, 74), null), - KeyValue.pair(windowFor(74, 75), "G-H")); + assertThatOutcomeIs(); sendAt("I", 100); - assertThatOutcomeIs( - KeyValue.pair(windowFor(100, 100), "I")); + assertThatOutcomeIs(KeyValue.pair(windowFor(74, 75), "G-H")); sendAt("J", 120); - assertThatOutcomeIs( - KeyValue.pair(windowFor(120, 120), "J")); + assertThatOutcomeIs(KeyValue.pair(windowFor(100, 100), "I")); sendAt("K", 140); - assertThatOutcomeIs( - KeyValue.pair(windowFor(140, 140), "K")); + assertThatOutcomeIs(KeyValue.pair(windowFor(120, 120), "J")); sendAt("L", 160); - assertThatOutcomeIs( - KeyValue.pair(windowFor(160, 160), "L")); + assertThatOutcomeIs(KeyValue.pair(windowFor(140, 140), "K")); + + // Never received, if no newer message is sent + // KeyValue.pair(windowFor(160, 160), "L")); } -- 2.20.1