import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.*;
 import org.apache.kafka.streams.kstream.*;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.test.TestRecord;
 import org.junit.jupiter.api.Test;
 
         .windowedBy(WINDOWS)
         .reduce(
             (aggregate, value) -> aggregate + "-" + value,
-            Materialized.as(STORE_NAME))
+            Materialized.<String, String, WindowStore<Bytes, byte[]>>as(STORE_NAME)
+                .withKeySerde(Serdes.String())
+                .withValueSerde(Serdes.String()))
         .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
         .toStream((k,v) -> k.toString())
         .to(OUTPUT);