This example needs an explicit serialization-config
authorKai Moritz <kai@juplo.de>
Wed, 17 Jul 2024 10:50:08 +0000 (12:50 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 17 Jul 2024 10:50:08 +0000 (12:50 +0200)
src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java

index 7efd751..5c2ff6b 100644 (file)
@@ -2,10 +2,12 @@ package de.juplo.kafka.wordcount.counter;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.*;
 import org.apache.kafka.streams.kstream.*;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.test.TestRecord;
 import org.junit.jupiter.api.Test;
 
@@ -39,7 +41,9 @@ public class AggregationTopologyTest
         .windowedBy(WINDOWS)
         .reduce(
             (aggregate, value) -> aggregate + "-" + value,
-            Materialized.as(STORE_NAME))
+            Materialized.<String, String, WindowStore<Bytes, byte[]>>as(STORE_NAME)
+                .withKeySerde(Serdes.String())
+                .withValueSerde(Serdes.String()))
         .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
         .toStream((k,v) -> k.toString())
         .to(OUTPUT);