import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.test.TestRecord;
import org.junit.jupiter.api.Test;
.windowedBy(WINDOWS)
.reduce(
(aggregate, value) -> aggregate + "-" + value,
- Materialized.as(STORE_NAME))
+ Materialized.<String, String, WindowStore<Bytes, byte[]>>as(STORE_NAME)
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.String()))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream((k,v) -> k.toString())
.to(OUTPUT);