From: Kai Moritz Date: Fri, 12 Jul 2024 06:34:16 +0000 (+0200) Subject: But a name should still be set for the store X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=4c012aa1242362899e8b5460a61eafada1a769b1;p=demos%2Fkafka%2Fwordcount But a name should still be set for the store --- diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java index a4c6cff..95e082d 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java @@ -3,10 +3,7 @@ package de.juplo.kafka.wordcount.counter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.SlidingWindows; -import org.apache.kafka.streams.kstream.Window; -import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.*; import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.test.TestRecord; @@ -25,6 +22,7 @@ import static org.assertj.core.api.Assertions.assertThat; @Slf4j public class AggregationTopologyTest { + static final String STORE_NAME = "aggregate-store"; static final Duration WINDOW_SIZE = Duration.ofSeconds(10); static final SlidingWindows WINDOWS = SlidingWindows.ofTimeDifferenceWithNoGrace(WINDOW_SIZE); @@ -39,7 +37,9 @@ public class AggregationTopologyTest input .groupByKey() .windowedBy(WINDOWS) - .reduce((aggregate, value) -> aggregate + "-" + value) + .reduce( + (aggregate, value) -> aggregate + "-" + value, + Materialized.as(STORE_NAME)) .toStream((k,v) -> k.toString()) .to(OUTPUT); @@ -202,7 +202,7 @@ public class AggregationTopologyTest void logStateStore(TopologyTestDriver testDriver) { - KeyValueIterator i = testDriver.getTimestampedWindowStore("KSTREAM-REDUCE-STATE-STORE-0000000001").all(); + KeyValueIterator i = testDriver.getTimestampedWindowStore(STORE_NAME).all(); while(i.hasNext()) { Object o = i.next();