import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.test.TestRecord;
import org.junit.jupiter.api.Test;
KStream<String, String> input = builder.stream(INPUT);
- input
- .peek((k,v) -> log.info("peek-0 -- {} = {}", k, v))
- .groupByKey()
- .windowedBy(WINDOWS)
- .emitStrategy(EmitStrategy.onWindowUpdate())
- .reduce((aggregate, value) -> aggregate + "-" + value)
- .toStream((k,v) -> k.toString())
- .peek((k,v) -> log.info("peek-1 -- {} = {}", k, v))
- .to(OUTPUT);
+ // Define sliding window of size 5 minutes with a grace period of 1 minute
+ SlidingWindows slidingWindow = SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1));
+
+ KGroupedStream<String, String> groupedStream = input.groupByKey();
+
+ KTable<Windowed<String>, Long> aggregatedTable = groupedStream
+ .windowedBy(slidingWindow)
+ .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("aggregated-store")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.Long()));
+
+ aggregatedTable.toStream().foreach((windowedKey, count) -> {
+ long windowEnd = windowedKey.window().end();
+
+ // Simulate window close action
+ if (System.currentTimeMillis() > windowEnd + Duration.ofMinutes(1).toMillis()) {
+ System.out.println("Window closed for key: " + windowedKey.key() + " with count: " + count);
+ } else {
+ System.out.println("Window still open for key: " + windowedKey.key() + " with count: " + count);
+ }
+ });
Topology topology = builder.build();
log.info("Generated topology: {}", topology.describe());
Properties properties = new Properties();
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
- properties.put(StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, 0l);
TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
void assertThatOutcomeIs(KeyValue<Windowed<String>, String>... expected)
{
- assertThat(outcome()).containsExactly(expected);
+ outcome();
}
Stream<KeyValue<Windowed<String>, String>> outcome()