But a name should still be set for the store
authorKai Moritz <kai@juplo.de>
Fri, 12 Jul 2024 06:34:16 +0000 (08:34 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 17 Jul 2024 10:43:07 +0000 (12:43 +0200)
src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java

index a4c6cff..95e082d 100644 (file)
@@ -3,10 +3,7 @@ package de.juplo.kafka.wordcount.counter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.SlidingWindows;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.*;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.test.TestRecord;
@@ -25,6 +22,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 @Slf4j
 public class AggregationTopologyTest
 {
+  static final String STORE_NAME = "aggregate-store";
   static final Duration WINDOW_SIZE = Duration.ofSeconds(10);
   static final SlidingWindows WINDOWS = SlidingWindows.ofTimeDifferenceWithNoGrace(WINDOW_SIZE);
 
@@ -39,7 +37,9 @@ public class AggregationTopologyTest
     input
         .groupByKey()
         .windowedBy(WINDOWS)
-        .reduce((aggregate, value) -> aggregate + "-" + value)
+        .reduce(
+            (aggregate, value) -> aggregate + "-" + value,
+            Materialized.as(STORE_NAME))
         .toStream((k,v) -> k.toString())
         .to(OUTPUT);
 
@@ -202,7 +202,7 @@ public class AggregationTopologyTest
 
   void logStateStore(TopologyTestDriver testDriver)
   {
-    KeyValueIterator i = testDriver.getTimestampedWindowStore("KSTREAM-REDUCE-STATE-STORE-0000000001").all();
+    KeyValueIterator i = testDriver.getTimestampedWindowStore(STORE_NAME).all();
     while(i.hasNext())
     {
       Object o = i.next();