The content of the state store is logged after each step
authorKai Moritz <kai@juplo.de>
Mon, 8 Jul 2024 18:34:40 +0000 (20:34 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 17 Jul 2024 10:43:07 +0000 (12:43 +0200)
src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java

index 8233a70..e37132d 100644 (file)
@@ -2,12 +2,12 @@ package de.juplo.kafka.wordcount.counter;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.*;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.test.TestRecord;
 import org.junit.jupiter.api.Test;
 
@@ -38,7 +38,11 @@ public class AggregationTopologyTest
     input
         .groupByKey()
         .windowedBy(WINDOWS)
-        .reduce((aggregate, value) -> aggregate + "-" + value)
+        .reduce(
+            (aggregate, value) -> aggregate + "-" + value,
+            Materialized.<String, String, WindowStore<Bytes, byte[]>>as("aggregated-store")
+                .withKeySerde(Serdes.String())
+                .withValueSerde(Serdes.String()))
         .toStream((k,v) -> k.toString())
         .to(OUTPUT);
 
@@ -65,17 +69,23 @@ public class AggregationTopologyTest
     assertThatOutcomeIs(
         KeyValue.pair(windowFor(53000), "A"));
 
+    logStateStore(testDriver);
+
     sendAt("B", 64);
     assertThatOutcomeIs(
         KeyValue.pair(windowFor(63001), "B"),
         KeyValue.pair(windowFor(54000), "A-B"));
 
+    logStateStore(testDriver);
+
     sendAt("C", 65);
     assertThatOutcomeIs(
         KeyValue.pair(windowFor(63001), "B-C"),
         KeyValue.pair(windowFor(64001), "C"),
         KeyValue.pair(windowFor(55000), "A-B-C"));
 
+    logStateStore(testDriver);
+
     sendAt("D", 66);
     assertThatOutcomeIs(
         KeyValue.pair(windowFor(64001), "C-D"),
@@ -83,6 +93,8 @@ public class AggregationTopologyTest
         KeyValue.pair(windowFor(65001), "D"),
         KeyValue.pair(windowFor(56000), "A-B-C-D"));
 
+    logStateStore(testDriver);
+
     sendAt("E", 69);
     assertThatOutcomeIs(
         KeyValue.pair(windowFor(65001), "D-E"),
@@ -91,6 +103,8 @@ public class AggregationTopologyTest
         KeyValue.pair(windowFor(66001), "E"),
         KeyValue.pair(windowFor(59000), "A-B-C-D-E"));
 
+    logStateStore(testDriver);
+
     sendAt("F", 70);
     assertThatOutcomeIs(
         KeyValue.pair(windowFor(66001), "E-F"),
@@ -100,6 +114,8 @@ public class AggregationTopologyTest
         KeyValue.pair(windowFor(69001), "F"),
         KeyValue.pair(windowFor(60000), "A-B-C-D-E-F"));
 
+    logStateStore(testDriver);
+
     sendAt("G", 74);
     assertThatOutcomeIs(
         KeyValue.pair(windowFor(69001), "F-G"),
@@ -109,6 +125,8 @@ public class AggregationTopologyTest
         KeyValue.pair(windowFor(70001), "G"),
         KeyValue.pair(windowFor(64000), "B-C-D-E-F-G"));
 
+    logStateStore(testDriver);
+
     sendAt("H", 75);
     assertThatOutcomeIs(
         KeyValue.pair(windowFor(70001), "G-H"),
@@ -118,21 +136,31 @@ public class AggregationTopologyTest
         KeyValue.pair(windowFor(74001), "H"),
         KeyValue.pair(windowFor(65000), "C-D-E-F-G-H"));
 
+    logStateStore(testDriver);
+
     sendAt("I", 100);
     assertThatOutcomeIs(
         KeyValue.pair(windowFor(90000), "I"));
 
+    logStateStore(testDriver);
+
     sendAt("J", 120);
     assertThatOutcomeIs(
         KeyValue.pair(windowFor(110000), "J"));
 
+    logStateStore(testDriver);
+
     sendAt("K", 140);
     assertThatOutcomeIs(
         KeyValue.pair(windowFor(130000), "K"));
 
+    logStateStore(testDriver);
+
     sendAt("L", 160);
     assertThatOutcomeIs(
         KeyValue.pair(windowFor(150000), "L"));
+
+    logStateStore(testDriver);
   }
 
 
@@ -175,6 +203,15 @@ public class AggregationTopologyTest
         .map(record -> KeyValue.pair(parse(record.key()), record.value()));
   }
 
+  void logStateStore(TopologyTestDriver testDriver)
+  {
+    KeyValueIterator i = testDriver.getTimestampedWindowStore("aggregated-store").all();
+    while(i.hasNext())
+    {
+      Object o = i.next();
+      log.info("{}", o);
+    }
+  }
 
   static final Pattern PATTERN = Pattern.compile("^\\[([^@]*)@(\\d+)/(\\d+)\\]$");