public class AggregationTopologyTest
{
static final Duration WINDOW_SIZE = Duration.ofSeconds(10);
- static final Duration ADVANCE = Duration.ofSeconds(3);
- static final TimeWindows WINDOWS = TimeWindows
- .ofSizeWithNoGrace(WINDOW_SIZE)
- .advanceBy(ADVANCE);
+ static final SlidingWindows WINDOWS = SlidingWindows.ofTimeDifferenceWithNoGrace(WINDOW_SIZE);
@Test
sendAt("A", 63);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(54), "A"),
- KeyValue.pair(windowFor(57), "A"),
- KeyValue.pair(windowFor(60), "A"),
- KeyValue.pair(windowFor(63), "A"));
+ KeyValue.pair(windowFor(53000), "A"));
sendAt("B", 64);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(57), "A-B"),
- KeyValue.pair(windowFor(60), "A-B"),
- KeyValue.pair(windowFor(63), "A-B"));
+ KeyValue.pair(windowFor(63001), "B"),
+ KeyValue.pair(windowFor(54000), "A-B"));
sendAt("C", 65);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(57), "A-B-C"),
- KeyValue.pair(windowFor(60), "A-B-C"),
- KeyValue.pair(windowFor(63), "A-B-C"));
+ KeyValue.pair(windowFor(63001), "B-C"),
+ KeyValue.pair(windowFor(64001), "C"),
+ KeyValue.pair(windowFor(55000), "A-B-C"));
sendAt("D", 66);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(57), "A-B-C-D"),
- KeyValue.pair(windowFor(60), "A-B-C-D"),
- KeyValue.pair(windowFor(63), "A-B-C-D"),
- KeyValue.pair(windowFor(66), "D"));
+ KeyValue.pair(windowFor(64001), "C-D"),
+ KeyValue.pair(windowFor(63001), "B-C-D"),
+ KeyValue.pair(windowFor(65001), "D"),
+ KeyValue.pair(windowFor(56000), "A-B-C-D"));
sendAt("E", 69);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(60), "A-B-C-D-E"),
- KeyValue.pair(windowFor(63), "A-B-C-D-E"),
- KeyValue.pair(windowFor(66), "D-E"),
- KeyValue.pair(windowFor(69), "E"));
+ KeyValue.pair(windowFor(65001), "D-E"),
+ KeyValue.pair(windowFor(64001), "C-D-E"),
+ KeyValue.pair(windowFor(63001), "B-C-D-E"),
+ KeyValue.pair(windowFor(66001), "E"),
+ KeyValue.pair(windowFor(59000), "A-B-C-D-E"));
sendAt("F", 70);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(63), "A-B-C-D-E-F"),
- KeyValue.pair(windowFor(66), "D-E-F"),
- KeyValue.pair(windowFor(69), "E-F"));
+ KeyValue.pair(windowFor(66001), "E-F"),
+ KeyValue.pair(windowFor(65001), "D-E-F"),
+ KeyValue.pair(windowFor(64001), "C-D-E-F"),
+ KeyValue.pair(windowFor(63001), "B-C-D-E-F"),
+ KeyValue.pair(windowFor(69001), "F"),
+ KeyValue.pair(windowFor(60000), "A-B-C-D-E-F"));
sendAt("G", 74);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(66), "D-E-F-G"),
- KeyValue.pair(windowFor(69), "E-F-G"),
- KeyValue.pair(windowFor(72), "G"));
+ KeyValue.pair(windowFor(69001), "F-G"),
+ KeyValue.pair(windowFor(66001), "E-F-G"),
+ KeyValue.pair(windowFor(65001), "D-E-F-G"),
+ KeyValue.pair(windowFor(64001), "C-D-E-F-G"),
+ KeyValue.pair(windowFor(70001), "G"),
+ KeyValue.pair(windowFor(64000), "B-C-D-E-F-G"));
sendAt("H", 75);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(66), "D-E-F-G-H"),
- KeyValue.pair(windowFor(69), "E-F-G-H"),
- KeyValue.pair(windowFor(72), "G-H"),
- KeyValue.pair(windowFor(75), "H"));
+ KeyValue.pair(windowFor(70001), "G-H"),
+ KeyValue.pair(windowFor(69001), "F-G-H"),
+ KeyValue.pair(windowFor(66001), "E-F-G-H"),
+ KeyValue.pair(windowFor(65001), "D-E-F-G-H"),
+ KeyValue.pair(windowFor(74001), "H"),
+ KeyValue.pair(windowFor(65000), "C-D-E-F-G-H"));
sendAt("I", 100);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(93), "I"),
- KeyValue.pair(windowFor(96), "I"),
- KeyValue.pair(windowFor(99), "I"));
+ KeyValue.pair(windowFor(90000), "I"));
sendAt("J", 120);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(111), "J"),
- KeyValue.pair(windowFor(114), "J"),
- KeyValue.pair(windowFor(117), "J"),
- KeyValue.pair(windowFor(120), "J"));
+ KeyValue.pair(windowFor(110000), "J"));
sendAt("K", 140);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(132), "K"),
- KeyValue.pair(windowFor(135), "K"),
- KeyValue.pair(windowFor(138), "K"));
+ KeyValue.pair(windowFor(130000), "K"));
sendAt("L", 160);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(153), "L"),
- KeyValue.pair(windowFor(156), "L"),
- KeyValue.pair(windowFor(159), "L"));
+ KeyValue.pair(windowFor(150000), "L"));
}
return new Windowed<>(key, window);
}
- Windowed<String> windowFor(int second)
+ Windowed<String> windowFor(long milli)
{
- Instant startTime = Instant.ofEpochSecond(second);
+ Instant startTime = Instant.ofEpochMilli(milli);
Instant endTime = startTime.plus(WINDOW_SIZE);
TimeWindow window = new TimeWindow(startTime.toEpochMilli(), endTime.toEpochMilli());
return new Windowed<>(KEY, window);