From 7f0fbcf51acd347782a36220c5d767199616207b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 8 Jul 2024 20:34:40 +0200 Subject: [PATCH] The content of the state store is logged after each step --- .../counter/AggregationTopologyTest.java | 47 +++++++++++++++++-- 1 file changed, 42 insertions(+), 5 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java index 8233a70..e37132d 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java @@ -2,12 +2,12 @@ package de.juplo.kafka.wordcount.counter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.TimeWindows; -import org.apache.kafka.streams.kstream.Window; -import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.*; import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.test.TestRecord; import org.junit.jupiter.api.Test; @@ -38,7 +38,11 @@ public class AggregationTopologyTest input .groupByKey() .windowedBy(WINDOWS) - .reduce((aggregate, value) -> aggregate + "-" + value) + .reduce( + (aggregate, value) -> aggregate + "-" + value, + Materialized.>as("aggregated-store") + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String())) .toStream((k,v) -> k.toString()) .to(OUTPUT); @@ -65,17 +69,23 @@ public class AggregationTopologyTest assertThatOutcomeIs( KeyValue.pair(windowFor(53000), "A")); + logStateStore(testDriver); + sendAt("B", 64); assertThatOutcomeIs( KeyValue.pair(windowFor(63001), "B"), KeyValue.pair(windowFor(54000), "A-B")); + logStateStore(testDriver); + sendAt("C", 65); assertThatOutcomeIs( KeyValue.pair(windowFor(63001), "B-C"), KeyValue.pair(windowFor(64001), "C"), KeyValue.pair(windowFor(55000), "A-B-C")); + logStateStore(testDriver); + sendAt("D", 66); assertThatOutcomeIs( KeyValue.pair(windowFor(64001), "C-D"), @@ -83,6 +93,8 @@ public class AggregationTopologyTest KeyValue.pair(windowFor(65001), "D"), KeyValue.pair(windowFor(56000), "A-B-C-D")); + logStateStore(testDriver); + sendAt("E", 69); assertThatOutcomeIs( KeyValue.pair(windowFor(65001), "D-E"), @@ -91,6 +103,8 @@ public class AggregationTopologyTest KeyValue.pair(windowFor(66001), "E"), KeyValue.pair(windowFor(59000), "A-B-C-D-E")); + logStateStore(testDriver); + sendAt("F", 70); assertThatOutcomeIs( KeyValue.pair(windowFor(66001), "E-F"), @@ -100,6 +114,8 @@ public class AggregationTopologyTest KeyValue.pair(windowFor(69001), "F"), KeyValue.pair(windowFor(60000), "A-B-C-D-E-F")); + logStateStore(testDriver); + sendAt("G", 74); assertThatOutcomeIs( KeyValue.pair(windowFor(69001), "F-G"), @@ -109,6 +125,8 @@ public class AggregationTopologyTest KeyValue.pair(windowFor(70001), "G"), KeyValue.pair(windowFor(64000), "B-C-D-E-F-G")); + logStateStore(testDriver); + sendAt("H", 75); assertThatOutcomeIs( KeyValue.pair(windowFor(70001), "G-H"), @@ -118,21 +136,31 @@ public class AggregationTopologyTest KeyValue.pair(windowFor(74001), "H"), KeyValue.pair(windowFor(65000), "C-D-E-F-G-H")); + logStateStore(testDriver); + sendAt("I", 100); assertThatOutcomeIs( KeyValue.pair(windowFor(90000), "I")); + logStateStore(testDriver); + sendAt("J", 120); assertThatOutcomeIs( KeyValue.pair(windowFor(110000), "J")); + logStateStore(testDriver); + sendAt("K", 140); assertThatOutcomeIs( KeyValue.pair(windowFor(130000), "K")); + logStateStore(testDriver); + sendAt("L", 160); assertThatOutcomeIs( KeyValue.pair(windowFor(150000), "L")); + + logStateStore(testDriver); } @@ -175,6 +203,15 @@ public class AggregationTopologyTest .map(record -> KeyValue.pair(parse(record.key()), record.value())); } + void logStateStore(TopologyTestDriver testDriver) + { + KeyValueIterator i = testDriver.getTimestampedWindowStore("aggregated-store").all(); + while(i.hasNext()) + { + Object o = i.next(); + log.info("{}", o); + } + } static final Pattern PATTERN = Pattern.compile("^\\[([^@]*)@(\\d+)/(\\d+)\\]$"); -- 2.20.1