import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.test.TestRecord;
import org.junit.jupiter.api.Test;
@Slf4j
public class AggregationTopologyTest
{
- static final Duration WINDOW_SIZE = Duration.ofSeconds(10);
- static final SlidingWindows WINDOWS = SlidingWindows.ofTimeDifferenceWithNoGrace(WINDOW_SIZE);
+ static final Duration INACTIVITY_GAP = Duration.ofSeconds(2);
+ static final SessionWindows WINDOWS = SessionWindows.ofInactivityGapWithNoGrace(INACTIVITY_GAP);
@Test
sendAt("A", 63);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(53000), "A"));
+ KeyValue.pair(windowFor(63, 63), "A"));
sendAt("B", 64);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(63001), "B"),
- KeyValue.pair(windowFor(54000), "A-B"));
+ KeyValue.pair(windowFor(63, 63), null),
+ KeyValue.pair(windowFor(63, 64), "A-B"));
sendAt("C", 65);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(63001), "B-C"),
- KeyValue.pair(windowFor(64001), "C"),
- KeyValue.pair(windowFor(55000), "A-B-C"));
+ KeyValue.pair(windowFor(63, 64), null),
+ KeyValue.pair(windowFor(63, 65), "A-B-C"));
sendAt("D", 66);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(64001), "C-D"),
- KeyValue.pair(windowFor(63001), "B-C-D"),
- KeyValue.pair(windowFor(65001), "D"),
- KeyValue.pair(windowFor(56000), "A-B-C-D"));
+ KeyValue.pair(windowFor(63, 65), null),
+ KeyValue.pair(windowFor(63, 66), "A-B-C-D"));
sendAt("E", 69);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(65001), "D-E"),
- KeyValue.pair(windowFor(64001), "C-D-E"),
- KeyValue.pair(windowFor(63001), "B-C-D-E"),
- KeyValue.pair(windowFor(66001), "E"),
- KeyValue.pair(windowFor(59000), "A-B-C-D-E"));
+ KeyValue.pair(windowFor(69, 69), "E"));
sendAt("F", 70);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(66001), "E-F"),
- KeyValue.pair(windowFor(65001), "D-E-F"),
- KeyValue.pair(windowFor(64001), "C-D-E-F"),
- KeyValue.pair(windowFor(63001), "B-C-D-E-F"),
- KeyValue.pair(windowFor(69001), "F"),
- KeyValue.pair(windowFor(60000), "A-B-C-D-E-F"));
+ KeyValue.pair(windowFor(69, 69), null),
+ KeyValue.pair(windowFor(69, 70), "E-F"));
sendAt("G", 74);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(69001), "F-G"),
- KeyValue.pair(windowFor(66001), "E-F-G"),
- KeyValue.pair(windowFor(65001), "D-E-F-G"),
- KeyValue.pair(windowFor(64001), "C-D-E-F-G"),
- KeyValue.pair(windowFor(70001), "G"),
- KeyValue.pair(windowFor(64000), "B-C-D-E-F-G"));
+ KeyValue.pair(windowFor(74, 74), "G"));
sendAt("H", 75);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(70001), "G-H"),
- KeyValue.pair(windowFor(69001), "F-G-H"),
- KeyValue.pair(windowFor(66001), "E-F-G-H"),
- KeyValue.pair(windowFor(65001), "D-E-F-G-H"),
- KeyValue.pair(windowFor(74001), "H"),
- KeyValue.pair(windowFor(65000), "C-D-E-F-G-H"));
+ KeyValue.pair(windowFor(74, 74), null),
+ KeyValue.pair(windowFor(74, 75), "G-H"));
sendAt("I", 100);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(90000), "I"));
+ KeyValue.pair(windowFor(100, 100), "I"));
sendAt("J", 120);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(110000), "J"));
+ KeyValue.pair(windowFor(120, 120), "J"));
sendAt("K", 140);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(130000), "K"));
+ KeyValue.pair(windowFor(140, 140), "K"));
sendAt("L", 160);
assertThatOutcomeIs(
- KeyValue.pair(windowFor(150000), "L"));
+ KeyValue.pair(windowFor(160, 160), "L"));
}
String start = matcher.group(2);
String end = matcher.group(3);
- Window window = new TimeWindow(Long.parseLong(start), Long.parseLong(end));
+ Window window = new SessionWindow(Long.parseLong(start), Long.parseLong(end));
return new Windowed<>(key, window);
}
- Windowed<String> windowFor(long milli)
+ Windowed<String> windowFor(int startSecond, int endSecond)
{
- Instant startTime = Instant.ofEpochMilli(milli);
- Instant endTime = startTime.plus(WINDOW_SIZE);
- TimeWindow window = new TimeWindow(startTime.toEpochMilli(), endTime.toEpochMilli());
+ Instant startTime = Instant.ofEpochSecond(startSecond);
+ Instant endTime = Instant.ofEpochSecond(endSecond);
+ SessionWindow window = new SessionWindow(startTime.toEpochMilli(), endTime.toEpochMilli());
return new Windowed<>(KEY, window);
}
}