.reduce(
(aggregate, value) -> aggregate + "-" + value,
Materialized.as(STORE_NAME))
+ .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream((k,v) -> k.toString())
.to(OUTPUT);
sendAt("B", 64);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(63001), "B"),
KeyValue.pair(windowFor(54000), "A-B"));
logStateStore(testDriver);
sendAt("C", 65);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(63001), "B-C"),
- KeyValue.pair(windowFor(64001), "C"),
KeyValue.pair(windowFor(55000), "A-B-C"));
logStateStore(testDriver);
sendAt("D", 66);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(64001), "C-D"),
- KeyValue.pair(windowFor(63001), "B-C-D"),
- KeyValue.pair(windowFor(65001), "D"),
KeyValue.pair(windowFor(56000), "A-B-C-D"));
logStateStore(testDriver);
sendAt("E", 69);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(65001), "D-E"),
- KeyValue.pair(windowFor(64001), "C-D-E"),
- KeyValue.pair(windowFor(63001), "B-C-D-E"),
- KeyValue.pair(windowFor(66001), "E"),
KeyValue.pair(windowFor(59000), "A-B-C-D-E"));
logStateStore(testDriver);
sendAt("F", 70);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(66001), "E-F"),
- KeyValue.pair(windowFor(65001), "D-E-F"),
- KeyValue.pair(windowFor(64001), "C-D-E-F"),
- KeyValue.pair(windowFor(63001), "B-C-D-E-F"),
- KeyValue.pair(windowFor(69001), "F"),
KeyValue.pair(windowFor(60000), "A-B-C-D-E-F"));
logStateStore(testDriver);
sendAt("G", 74);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(69001), "F-G"),
- KeyValue.pair(windowFor(66001), "E-F-G"),
- KeyValue.pair(windowFor(65001), "D-E-F-G"),
- KeyValue.pair(windowFor(64001), "C-D-E-F-G"),
- KeyValue.pair(windowFor(70001), "G"),
+ KeyValue.pair(windowFor(63001), "B-C-D-E-F"),
KeyValue.pair(windowFor(64000), "B-C-D-E-F-G"));
logStateStore(testDriver);
sendAt("H", 75);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(70001), "G-H"),
- KeyValue.pair(windowFor(69001), "F-G-H"),
- KeyValue.pair(windowFor(66001), "E-F-G-H"),
- KeyValue.pair(windowFor(65001), "D-E-F-G-H"),
- KeyValue.pair(windowFor(74001), "H"),
+ KeyValue.pair(windowFor(64001), "C-D-E-F-G"),
KeyValue.pair(windowFor(65000), "C-D-E-F-G-H"));
logStateStore(testDriver);
sendAt("I", 100);
assertThatOutcomeIs(
+ KeyValue.pair(windowFor(65001), "D-E-F-G-H"),
+ KeyValue.pair(windowFor(66001), "E-F-G-H"),
+ KeyValue.pair(windowFor(69001), "F-G-H"),
+ KeyValue.pair(windowFor(70001), "G-H"),
+ KeyValue.pair(windowFor(74001), "H"),
KeyValue.pair(windowFor(90000), "I"));
logStateStore(testDriver);