import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.test.TestRecord;
import org.junit.jupiter.api.Test;
input
.groupByKey()
.windowedBy(WINDOWS)
- .reduce((aggregate, value) -> aggregate + "-" + value)
+ .reduce(
+ (aggregate, value) -> aggregate + "-" + value,
+ Materialized.<String, String, WindowStore<Bytes, byte[]>>as("aggregated-store")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.String()))
.toStream((k,v) -> k.toString())
.to(OUTPUT);
assertThatOutcomeIs(
KeyValue.pair(windowFor(53000), "A"));
+ logStateStore(testDriver);
+
sendAt("B", 64);
assertThatOutcomeIs(
KeyValue.pair(windowFor(63001), "B"),
KeyValue.pair(windowFor(54000), "A-B"));
+ logStateStore(testDriver);
+
sendAt("C", 65);
assertThatOutcomeIs(
KeyValue.pair(windowFor(63001), "B-C"),
KeyValue.pair(windowFor(64001), "C"),
KeyValue.pair(windowFor(55000), "A-B-C"));
+ logStateStore(testDriver);
+
sendAt("D", 66);
assertThatOutcomeIs(
KeyValue.pair(windowFor(64001), "C-D"),
KeyValue.pair(windowFor(65001), "D"),
KeyValue.pair(windowFor(56000), "A-B-C-D"));
+ logStateStore(testDriver);
+
sendAt("E", 69);
assertThatOutcomeIs(
KeyValue.pair(windowFor(65001), "D-E"),
KeyValue.pair(windowFor(66001), "E"),
KeyValue.pair(windowFor(59000), "A-B-C-D-E"));
+ logStateStore(testDriver);
+
sendAt("F", 70);
assertThatOutcomeIs(
KeyValue.pair(windowFor(66001), "E-F"),
KeyValue.pair(windowFor(69001), "F"),
KeyValue.pair(windowFor(60000), "A-B-C-D-E-F"));
+ logStateStore(testDriver);
+
sendAt("G", 74);
assertThatOutcomeIs(
KeyValue.pair(windowFor(69001), "F-G"),
KeyValue.pair(windowFor(70001), "G"),
KeyValue.pair(windowFor(64000), "B-C-D-E-F-G"));
+ logStateStore(testDriver);
+
sendAt("H", 75);
assertThatOutcomeIs(
KeyValue.pair(windowFor(70001), "G-H"),
KeyValue.pair(windowFor(74001), "H"),
KeyValue.pair(windowFor(65000), "C-D-E-F-G-H"));
+ logStateStore(testDriver);
+
sendAt("I", 100);
assertThatOutcomeIs(
KeyValue.pair(windowFor(90000), "I"));
+ logStateStore(testDriver);
+
sendAt("J", 120);
assertThatOutcomeIs(
KeyValue.pair(windowFor(110000), "J"));
+ logStateStore(testDriver);
+
sendAt("K", 140);
assertThatOutcomeIs(
KeyValue.pair(windowFor(130000), "K"));
+ logStateStore(testDriver);
+
sendAt("L", 160);
assertThatOutcomeIs(
KeyValue.pair(windowFor(150000), "L"));
+
+ logStateStore(testDriver);
}
.map(record -> KeyValue.pair(parse(record.key()), record.value()));
}
+ void logStateStore(TopologyTestDriver testDriver)
+ {
+ KeyValueIterator i = testDriver.getTimestampedWindowStore("aggregated-store").all();
+ while(i.hasNext())
+ {
+ Object o = i.next();
+ log.info("{}", o);
+ }
+ }
static final Pattern PATTERN = Pattern.compile("^\\[([^@]*)@(\\d+)/(\\d+)\\]$");