@Slf4j
public class AggregationTopologyTest
{
- static final TimeWindows WINDOWS = TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(10));
+ static final Duration WINDOW_SIZE = Duration.ofSeconds(10);
+ static final TimeWindows WINDOWS = TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE);
@Test
Windowed<String> windowFor(int second)
{
- Instant time = Instant.ofEpochSecond(second);
- long timestamp = time.toEpochMilli();
- return new Windowed<>(KEY, WINDOWS.windowsFor(timestamp).values().stream().findFirst().get());
+ Instant startTime = Instant.ofEpochSecond(second);
+ Instant endTime = startTime.plus(WINDOW_SIZE);
+ TimeWindow window = new TimeWindow(startTime.toEpochMilli(), endTime.toEpochMilli());
+ return new Windowed<>(KEY, window);
}
}