import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.test.TestRecord;
import org.junit.jupiter.api.Test;
input
.groupByKey()
.windowedBy(WINDOWS)
- .reduce((aggregate, value) -> aggregate + "-" + value)
+ .reduce(
+ (aggregate, value) -> aggregate + "-" + value,
+ Materialized.<String, String, WindowStore<Bytes, byte[]>>as("aggregated-store")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.String()))
+ .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream((k,v) -> k.toString())
.to(OUTPUT);
sendAt("A", 63);
- assertThatOutcomeIs(
- KeyValue.pair(windowFor(54), "A"),
- KeyValue.pair(windowFor(57), "A"),
- KeyValue.pair(windowFor(60), "A"),
- KeyValue.pair(windowFor(63), "A"));
+ assertThatOutcomeIs();
sendAt("B", 64);
- assertThatOutcomeIs(
- KeyValue.pair(windowFor(57), "A-B"),
- KeyValue.pair(windowFor(60), "A-B"),
- KeyValue.pair(windowFor(63), "A-B"));
+ assertThatOutcomeIs(KeyValue.pair(windowFor(54), "A"));
sendAt("C", 65);
- assertThatOutcomeIs(
- KeyValue.pair(windowFor(57), "A-B-C"),
- KeyValue.pair(windowFor(60), "A-B-C"),
- KeyValue.pair(windowFor(63), "A-B-C"));
+ assertThatOutcomeIs();
sendAt("D", 66);
- assertThatOutcomeIs(
- KeyValue.pair(windowFor(57), "A-B-C-D"),
- KeyValue.pair(windowFor(60), "A-B-C-D"),
- KeyValue.pair(windowFor(63), "A-B-C-D"),
- KeyValue.pair(windowFor(66), "D"));
+ assertThatOutcomeIs();
sendAt("E", 69);
- assertThatOutcomeIs(
- KeyValue.pair(windowFor(60), "A-B-C-D-E"),
- KeyValue.pair(windowFor(63), "A-B-C-D-E"),
- KeyValue.pair(windowFor(66), "D-E"),
- KeyValue.pair(windowFor(69), "E"));
+ assertThatOutcomeIs(KeyValue.pair(windowFor(57), "A-B-C-D"));
sendAt("F", 70);
- assertThatOutcomeIs(
- KeyValue.pair(windowFor(63), "A-B-C-D-E-F"),
- KeyValue.pair(windowFor(66), "D-E-F"),
- KeyValue.pair(windowFor(69), "E-F"));
+ assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C-D-E"));
sendAt("G", 74);
- assertThatOutcomeIs(
- KeyValue.pair(windowFor(66), "D-E-F-G"),
- KeyValue.pair(windowFor(69), "E-F-G"),
- KeyValue.pair(windowFor(72), "G"));
+ assertThatOutcomeIs(KeyValue.pair(windowFor(63), "A-B-C-D-E-F"));
sendAt("H", 75);
+ assertThatOutcomeIs();
+
+ sendAt("I", 100);
assertThatOutcomeIs(
KeyValue.pair(windowFor(66), "D-E-F-G-H"),
KeyValue.pair(windowFor(69), "E-F-G-H"),
KeyValue.pair(windowFor(72), "G-H"),
KeyValue.pair(windowFor(75), "H"));
- sendAt("I", 100);
+ sendAt("J", 120);
assertThatOutcomeIs(
KeyValue.pair(windowFor(93), "I"),
KeyValue.pair(windowFor(96), "I"),
KeyValue.pair(windowFor(99), "I"));
- sendAt("J", 120);
+ sendAt("K", 140);
assertThatOutcomeIs(
KeyValue.pair(windowFor(111), "J"),
KeyValue.pair(windowFor(114), "J"),
KeyValue.pair(windowFor(117), "J"),
KeyValue.pair(windowFor(120), "J"));
- sendAt("K", 140);
+ sendAt("L", 160);
assertThatOutcomeIs(
KeyValue.pair(windowFor(132), "K"),
KeyValue.pair(windowFor(135), "K"),
KeyValue.pair(windowFor(138), "K"));
- sendAt("L", 160);
- assertThatOutcomeIs(
- KeyValue.pair(windowFor(153), "L"),
- KeyValue.pair(windowFor(156), "L"),
- KeyValue.pair(windowFor(159), "L"));
+ // Never received, if no newer message is send
+ // KeyValue.pair(windowFor(153), "L")
+ // KeyValue.pair(windowFor(156), "L")
+ //KeyValue.pair(windowFor(159), "L")
}