From: Kai Moritz Date: Wed, 17 Jul 2024 10:50:08 +0000 (+0200) Subject: This example needs an explicit serialization-config X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=d584554e3e760a2661e98dd5940a3cc800ad297e;p=demos%2Fkafka%2Fwordcount This example needs an explicit serialization-config --- diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java index 7efd751..5c2ff6b 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java @@ -2,10 +2,12 @@ package de.juplo.kafka.wordcount.counter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.*; import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.test.TestRecord; import org.junit.jupiter.api.Test; @@ -39,7 +41,9 @@ public class AggregationTopologyTest .windowedBy(WINDOWS) .reduce( (aggregate, value) -> aggregate + "-" + value, - Materialized.as(STORE_NAME)) + Materialized.>as(STORE_NAME) + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String())) .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) .toStream((k,v) -> k.toString()) .to(OUTPUT);