From d584554e3e760a2661e98dd5940a3cc800ad297e Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 17 Jul 2024 12:50:08 +0200 Subject: [PATCH] This example needs an explicit serialization-config --- .../kafka/wordcount/counter/AggregationTopologyTest.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java index 7efd751..5c2ff6b 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java @@ -2,10 +2,12 @@ package de.juplo.kafka.wordcount.counter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.*; import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.test.TestRecord; import org.junit.jupiter.api.Test; @@ -39,7 +41,9 @@ public class AggregationTopologyTest .windowedBy(WINDOWS) .reduce( (aggregate, value) -> aggregate + "-" + value, - Materialized.as(STORE_NAME)) + Materialized.>as(STORE_NAME) + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String())) .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) .toStream((k,v) -> k.toString()) .to(OUTPUT); -- 2.20.1