Activated supression of intermediate results and adapted expected result
authorKai Moritz <kai@juplo.de>
Mon, 8 Jul 2024 14:33:34 +0000 (16:33 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 17 Jul 2024 09:59:55 +0000 (11:59 +0200)
src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java

index 36715bd..65eeeab 100644 (file)
@@ -2,12 +2,11 @@ package de.juplo.kafka.wordcount.counter;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.*;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.test.TestRecord;
 import org.junit.jupiter.api.Test;
 
@@ -41,7 +40,12 @@ public class AggregationTopologyTest
     input
         .groupByKey()
         .windowedBy(WINDOWS)
-        .reduce((aggregate, value) -> aggregate + "-" + value)
+        .reduce(
+            (aggregate, value) -> aggregate + "-" + value,
+            Materialized.<String, String, WindowStore<Bytes, byte[]>>as("aggregated-store")
+                .withKeySerde(Serdes.String())
+                .withValueSerde(Serdes.String()))
+        .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
         .toStream((k,v) -> k.toString())
         .to(OUTPUT);
 
@@ -65,81 +69,59 @@ public class AggregationTopologyTest
 
 
     sendAt("A", 63);
-    assertThatOutcomeIs(
-        KeyValue.pair(windowFor(54), "A"),
-        KeyValue.pair(windowFor(57), "A"),
-        KeyValue.pair(windowFor(60), "A"),
-        KeyValue.pair(windowFor(63), "A"));
+    assertThatOutcomeIs();
 
     sendAt("B", 64);
-    assertThatOutcomeIs(
-        KeyValue.pair(windowFor(57), "A-B"),
-        KeyValue.pair(windowFor(60), "A-B"),
-        KeyValue.pair(windowFor(63), "A-B"));
+    assertThatOutcomeIs(KeyValue.pair(windowFor(54), "A"));
 
     sendAt("C", 65);
-    assertThatOutcomeIs(
-        KeyValue.pair(windowFor(57), "A-B-C"),
-        KeyValue.pair(windowFor(60), "A-B-C"),
-        KeyValue.pair(windowFor(63), "A-B-C"));
+    assertThatOutcomeIs();
 
     sendAt("D", 66);
-    assertThatOutcomeIs(
-        KeyValue.pair(windowFor(57), "A-B-C-D"),
-        KeyValue.pair(windowFor(60), "A-B-C-D"),
-        KeyValue.pair(windowFor(63), "A-B-C-D"),
-        KeyValue.pair(windowFor(66), "D"));
+    assertThatOutcomeIs();
 
     sendAt("E", 69);
-    assertThatOutcomeIs(
-        KeyValue.pair(windowFor(60), "A-B-C-D-E"),
-        KeyValue.pair(windowFor(63), "A-B-C-D-E"),
-        KeyValue.pair(windowFor(66), "D-E"),
-        KeyValue.pair(windowFor(69), "E"));
+    assertThatOutcomeIs(KeyValue.pair(windowFor(57), "A-B-C-D"));
 
     sendAt("F", 70);
-    assertThatOutcomeIs(
-        KeyValue.pair(windowFor(63), "A-B-C-D-E-F"),
-        KeyValue.pair(windowFor(66), "D-E-F"),
-        KeyValue.pair(windowFor(69), "E-F"));
+    assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C-D-E"));
 
     sendAt("G", 74);
-    assertThatOutcomeIs(
-        KeyValue.pair(windowFor(66), "D-E-F-G"),
-        KeyValue.pair(windowFor(69), "E-F-G"),
-        KeyValue.pair(windowFor(72), "G"));
+    assertThatOutcomeIs(KeyValue.pair(windowFor(63), "A-B-C-D-E-F"));
 
     sendAt("H", 75);
+    assertThatOutcomeIs();
+
+    sendAt("I", 100);
     assertThatOutcomeIs(
         KeyValue.pair(windowFor(66), "D-E-F-G-H"),
         KeyValue.pair(windowFor(69), "E-F-G-H"),
         KeyValue.pair(windowFor(72), "G-H"),
         KeyValue.pair(windowFor(75), "H"));
 
-    sendAt("I", 100);
+    sendAt("J", 120);
     assertThatOutcomeIs(
         KeyValue.pair(windowFor(93), "I"),
         KeyValue.pair(windowFor(96), "I"),
         KeyValue.pair(windowFor(99), "I"));
 
-    sendAt("J", 120);
+    sendAt("K", 140);
     assertThatOutcomeIs(
         KeyValue.pair(windowFor(111), "J"),
         KeyValue.pair(windowFor(114), "J"),
         KeyValue.pair(windowFor(117), "J"),
         KeyValue.pair(windowFor(120), "J"));
 
-    sendAt("K", 140);
+    sendAt("L", 160);
     assertThatOutcomeIs(
         KeyValue.pair(windowFor(132), "K"),
         KeyValue.pair(windowFor(135), "K"),
         KeyValue.pair(windowFor(138), "K"));
 
-    sendAt("L", 160);
-    assertThatOutcomeIs(
-        KeyValue.pair(windowFor(153), "L"),
-        KeyValue.pair(windowFor(156), "L"),
-        KeyValue.pair(windowFor(159), "L"));
+    // Never received, if no newer message is send
+    // KeyValue.pair(windowFor(153), "L")
+    // KeyValue.pair(windowFor(156), "L")
+    //KeyValue.pair(windowFor(159), "L")
   }