From: Kai Moritz Date: Mon, 8 Jul 2024 14:33:34 +0000 (+0200) Subject: Activated supression of intermediate results and adapted expected result X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=d5f20ac4b341db817468ca10e85710f7ffbd2268;p=demos%2Fkafka%2Fwordcount Activated supression of intermediate results and adapted expected result --- diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java index 95e082d..7efd751 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java @@ -40,6 +40,7 @@ public class AggregationTopologyTest .reduce( (aggregate, value) -> aggregate + "-" + value, Materialized.as(STORE_NAME)) + .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) .toStream((k,v) -> k.toString()) .to(OUTPUT); @@ -70,73 +71,55 @@ public class AggregationTopologyTest sendAt("B", 64); assertThatOutcomeIs( - KeyValue.pair(windowFor(63001), "B"), KeyValue.pair(windowFor(54000), "A-B")); logStateStore(testDriver); sendAt("C", 65); assertThatOutcomeIs( - KeyValue.pair(windowFor(63001), "B-C"), - KeyValue.pair(windowFor(64001), "C"), KeyValue.pair(windowFor(55000), "A-B-C")); logStateStore(testDriver); sendAt("D", 66); assertThatOutcomeIs( - KeyValue.pair(windowFor(64001), "C-D"), - KeyValue.pair(windowFor(63001), "B-C-D"), - KeyValue.pair(windowFor(65001), "D"), KeyValue.pair(windowFor(56000), "A-B-C-D")); logStateStore(testDriver); sendAt("E", 69); assertThatOutcomeIs( - KeyValue.pair(windowFor(65001), "D-E"), - KeyValue.pair(windowFor(64001), "C-D-E"), - KeyValue.pair(windowFor(63001), "B-C-D-E"), - KeyValue.pair(windowFor(66001), "E"), KeyValue.pair(windowFor(59000), "A-B-C-D-E")); logStateStore(testDriver); sendAt("F", 70); assertThatOutcomeIs( - KeyValue.pair(windowFor(66001), "E-F"), - KeyValue.pair(windowFor(65001), "D-E-F"), - KeyValue.pair(windowFor(64001), "C-D-E-F"), - KeyValue.pair(windowFor(63001), "B-C-D-E-F"), - KeyValue.pair(windowFor(69001), "F"), KeyValue.pair(windowFor(60000), "A-B-C-D-E-F")); logStateStore(testDriver); sendAt("G", 74); assertThatOutcomeIs( - KeyValue.pair(windowFor(69001), "F-G"), - KeyValue.pair(windowFor(66001), "E-F-G"), - KeyValue.pair(windowFor(65001), "D-E-F-G"), - KeyValue.pair(windowFor(64001), "C-D-E-F-G"), - KeyValue.pair(windowFor(70001), "G"), + KeyValue.pair(windowFor(63001), "B-C-D-E-F"), KeyValue.pair(windowFor(64000), "B-C-D-E-F-G")); logStateStore(testDriver); sendAt("H", 75); assertThatOutcomeIs( - KeyValue.pair(windowFor(70001), "G-H"), - KeyValue.pair(windowFor(69001), "F-G-H"), - KeyValue.pair(windowFor(66001), "E-F-G-H"), - KeyValue.pair(windowFor(65001), "D-E-F-G-H"), - KeyValue.pair(windowFor(74001), "H"), + KeyValue.pair(windowFor(64001), "C-D-E-F-G"), KeyValue.pair(windowFor(65000), "C-D-E-F-G-H")); logStateStore(testDriver); sendAt("I", 100); assertThatOutcomeIs( + KeyValue.pair(windowFor(65001), "D-E-F-G-H"), + KeyValue.pair(windowFor(66001), "E-F-G-H"), + KeyValue.pair(windowFor(69001), "F-G-H"), + KeyValue.pair(windowFor(70001), "G-H"), + KeyValue.pair(windowFor(74001), "H"), KeyValue.pair(windowFor(90000), "I")); logStateStore(testDriver);