// Define sliding window of size 5 minutes with a grace period of 1 minute
SlidingWindows slidingWindow = SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1));
- KGroupedStream<String, String> groupedStream = input.groupByKey();
+ KGroupedStream<String, String> groupedStream = input
+ .peek((k,v) -> log.info("peek-0 -- {} = {}", k, v))
+ .groupByKey();
KTable<Windowed<String>, Long> aggregatedTable = groupedStream
.windowedBy(slidingWindow)
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()));
- aggregatedTable.toStream().foreach((windowedKey, count) -> {
- long windowEnd = windowedKey.window().end();
-
- // Simulate window close action
- if (System.currentTimeMillis() > windowEnd + Duration.ofMinutes(1).toMillis()) {
- System.out.println("Window closed for key: " + windowedKey.key() + " with count: " + count);
- } else {
- System.out.println("Window still open for key: " + windowedKey.key() + " with count: " + count);
- }
- });
+ aggregatedTable
+ .toStream()
+ .peek((k,v) -> log.info("peek-1 -- {} = {}", k, v))
+ .to(OUTPUT);
Topology topology = builder.build();
log.info("Generated topology: {}", topology.describe());