From a391c7aa85fa8790646442c21fb620a9e47d5cf0 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 12 Jul 2024 08:32:07 +0200 Subject: [PATCH] Supplying materialization details is not necessary --- .../counter/AggregationTopologyTest.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java index e37132d..a4c6cff 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java @@ -2,12 +2,13 @@ package de.juplo.kafka.wordcount.counter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.*; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.test.TestRecord; import org.junit.jupiter.api.Test; @@ -38,11 +39,7 @@ public class AggregationTopologyTest input .groupByKey() .windowedBy(WINDOWS) - .reduce( - (aggregate, value) -> aggregate + "-" + value, - Materialized.>as("aggregated-store") - .withKeySerde(Serdes.String()) - .withValueSerde(Serdes.String())) + .reduce((aggregate, value) -> aggregate + "-" + value) .toStream((k,v) -> k.toString()) .to(OUTPUT); @@ -205,7 +202,7 @@ public class AggregationTopologyTest void logStateStore(TopologyTestDriver testDriver) { - KeyValueIterator i = testDriver.getTimestampedWindowStore("aggregated-store").all(); + KeyValueIterator i = testDriver.getTimestampedWindowStore("KSTREAM-REDUCE-STATE-STORE-0000000001").all(); while(i.hasNext()) { Object o = i.next(); -- 2.20.1