From: Kai Moritz <kai@juplo.de>
Date: Mon, 8 Jul 2024 14:33:34 +0000 (+0200)
Subject: Activated supression of intermediate results and adapted expected result
X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=d5f20ac4b341db817468ca10e85710f7ffbd2268;p=demos%2Fkafka%2Fwordcount

Activated supression of intermediate results and adapted expected result
---

diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java
index 95e082d..7efd751 100644
--- a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java
+++ b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java
@@ -40,6 +40,7 @@ public class AggregationTopologyTest
         .reduce(
             (aggregate, value) -> aggregate + "-" + value,
             Materialized.as(STORE_NAME))
+        .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
         .toStream((k,v) -> k.toString())
         .to(OUTPUT);
 
@@ -70,73 +71,55 @@ public class AggregationTopologyTest
 
     sendAt("B", 64);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(63001), "B"),
         KeyValue.pair(windowFor(54000), "A-B"));
 
     logStateStore(testDriver);
 
     sendAt("C", 65);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(63001), "B-C"),
-        KeyValue.pair(windowFor(64001), "C"),
         KeyValue.pair(windowFor(55000), "A-B-C"));
 
     logStateStore(testDriver);
 
     sendAt("D", 66);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(64001), "C-D"),
-        KeyValue.pair(windowFor(63001), "B-C-D"),
-        KeyValue.pair(windowFor(65001), "D"),
         KeyValue.pair(windowFor(56000), "A-B-C-D"));
 
     logStateStore(testDriver);
 
     sendAt("E", 69);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(65001), "D-E"),
-        KeyValue.pair(windowFor(64001), "C-D-E"),
-        KeyValue.pair(windowFor(63001), "B-C-D-E"),
-        KeyValue.pair(windowFor(66001), "E"),
         KeyValue.pair(windowFor(59000), "A-B-C-D-E"));
 
     logStateStore(testDriver);
 
     sendAt("F", 70);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(66001), "E-F"),
-        KeyValue.pair(windowFor(65001), "D-E-F"),
-        KeyValue.pair(windowFor(64001), "C-D-E-F"),
-        KeyValue.pair(windowFor(63001), "B-C-D-E-F"),
-        KeyValue.pair(windowFor(69001), "F"),
         KeyValue.pair(windowFor(60000), "A-B-C-D-E-F"));
 
     logStateStore(testDriver);
 
     sendAt("G", 74);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(69001), "F-G"),
-        KeyValue.pair(windowFor(66001), "E-F-G"),
-        KeyValue.pair(windowFor(65001), "D-E-F-G"),
-        KeyValue.pair(windowFor(64001), "C-D-E-F-G"),
-        KeyValue.pair(windowFor(70001), "G"),
+        KeyValue.pair(windowFor(63001), "B-C-D-E-F"),
         KeyValue.pair(windowFor(64000), "B-C-D-E-F-G"));
 
     logStateStore(testDriver);
 
     sendAt("H", 75);
     assertThatOutcomeIs(
-        KeyValue.pair(windowFor(70001), "G-H"),
-        KeyValue.pair(windowFor(69001), "F-G-H"),
-        KeyValue.pair(windowFor(66001), "E-F-G-H"),
-        KeyValue.pair(windowFor(65001), "D-E-F-G-H"),
-        KeyValue.pair(windowFor(74001), "H"),
+        KeyValue.pair(windowFor(64001), "C-D-E-F-G"),
         KeyValue.pair(windowFor(65000), "C-D-E-F-G-H"));
 
     logStateStore(testDriver);
 
     sendAt("I", 100);
     assertThatOutcomeIs(
+        KeyValue.pair(windowFor(65001), "D-E-F-G-H"),
+        KeyValue.pair(windowFor(66001), "E-F-G-H"),
+        KeyValue.pair(windowFor(69001), "F-G-H"),
+        KeyValue.pair(windowFor(70001), "G-H"),
+        KeyValue.pair(windowFor(74001), "H"),
         KeyValue.pair(windowFor(90000), "I"));
 
     logStateStore(testDriver);