import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.*;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.test.TestRecord;
import org.junit.jupiter.api.Test;
input
.groupByKey()
.windowedBy(WINDOWS)
- .reduce(
- (aggregate, value) -> aggregate + "-" + value,
- Materialized.<String, String, WindowStore<Bytes, byte[]>>as("aggregated-store")
- .withKeySerde(Serdes.String())
- .withValueSerde(Serdes.String()))
+ .reduce((aggregate, value) -> aggregate + "-" + value)
.toStream((k,v) -> k.toString())
.to(OUTPUT);
void logStateStore(TopologyTestDriver testDriver)
{
- KeyValueIterator i = testDriver.getTimestampedWindowStore("aggregated-store").all();
+ KeyValueIterator i = testDriver.getTimestampedWindowStore("KSTREAM-REDUCE-STATE-STORE-0000000001").all();
while(i.hasNext())
{
Object o = i.next();