public class AggregationTopologyTest
{
static final Duration WINDOW_SIZE = Duration.ofSeconds(10);
- static final TimeWindows WINDOWS = TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE);
+ static final Duration ADVANCE = Duration.ofSeconds(3);
+ static final TimeWindows WINDOWS = TimeWindows
+ .ofSizeWithNoGrace(WINDOW_SIZE)
+ .advanceBy(ADVANCE);
@Test
sendAt("A", 63);
- assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A"));
+ assertThatOutcomeIs(
+ KeyValue.pair(windowFor(54), "A"),
+ KeyValue.pair(windowFor(57), "A"),
+ KeyValue.pair(windowFor(60), "A"),
+ KeyValue.pair(windowFor(63), "A"));
sendAt("B", 64);
- assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B"));
+ assertThatOutcomeIs(
+ KeyValue.pair(windowFor(57), "A-B"),
+ KeyValue.pair(windowFor(60), "A-B"),
+ KeyValue.pair(windowFor(63), "A-B"));
sendAt("C", 65);
- assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C"));
+ assertThatOutcomeIs(
+ KeyValue.pair(windowFor(57), "A-B-C"),
+ KeyValue.pair(windowFor(60), "A-B-C"),
+ KeyValue.pair(windowFor(63), "A-B-C"));
sendAt("D", 66);
- assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C-D"));
+ assertThatOutcomeIs(
+ KeyValue.pair(windowFor(57), "A-B-C-D"),
+ KeyValue.pair(windowFor(60), "A-B-C-D"),
+ KeyValue.pair(windowFor(63), "A-B-C-D"),
+ KeyValue.pair(windowFor(66), "D"));
sendAt("E", 69);
- assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C-D-E"));
+ assertThatOutcomeIs(
+ KeyValue.pair(windowFor(60), "A-B-C-D-E"),
+ KeyValue.pair(windowFor(63), "A-B-C-D-E"),
+ KeyValue.pair(windowFor(66), "D-E"),
+ KeyValue.pair(windowFor(69), "E"));
sendAt("F", 70);
- assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F"));
+ assertThatOutcomeIs(
+ KeyValue.pair(windowFor(63), "A-B-C-D-E-F"),
+ KeyValue.pair(windowFor(66), "D-E-F"),
+ KeyValue.pair(windowFor(69), "E-F"));
sendAt("G", 74);
- assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F-G"));
+ assertThatOutcomeIs(
+ KeyValue.pair(windowFor(66), "D-E-F-G"),
+ KeyValue.pair(windowFor(69), "E-F-G"),
+ KeyValue.pair(windowFor(72), "G"));
sendAt("H", 75);
- assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F-G-H"));
+ assertThatOutcomeIs(
+ KeyValue.pair(windowFor(66), "D-E-F-G-H"),
+ KeyValue.pair(windowFor(69), "E-F-G-H"),
+ KeyValue.pair(windowFor(72), "G-H"),
+ KeyValue.pair(windowFor(75), "H"));
sendAt("I", 100);
- assertThatOutcomeIs(KeyValue.pair(windowFor(100), "I"));
+ assertThatOutcomeIs(
+ KeyValue.pair(windowFor(93), "I"),
+ KeyValue.pair(windowFor(96), "I"),
+ KeyValue.pair(windowFor(99), "I"));
sendAt("J", 120);
- assertThatOutcomeIs(KeyValue.pair(windowFor(120), "J"));
+ assertThatOutcomeIs(
+ KeyValue.pair(windowFor(111), "J"),
+ KeyValue.pair(windowFor(114), "J"),
+ KeyValue.pair(windowFor(117), "J"),
+ KeyValue.pair(windowFor(120), "J"));
sendAt("K", 140);
- assertThatOutcomeIs(KeyValue.pair(windowFor(140), "K"));
+ assertThatOutcomeIs(
+ KeyValue.pair(windowFor(132), "K"),
+ KeyValue.pair(windowFor(135), "K"),
+ KeyValue.pair(windowFor(138), "K"));
sendAt("L", 160);
- assertThatOutcomeIs(KeyValue.pair(windowFor(160), "L"));
+ assertThatOutcomeIs(
+ KeyValue.pair(windowFor(153), "L"),
+ KeyValue.pair(windowFor(156), "L"),
+ KeyValue.pair(windowFor(159), "L"));
}