import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.SlidingWindows;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.test.TestRecord;
@Slf4j
public class AggregationTopologyTest
{
+ static final String STORE_NAME = "aggregate-store";
static final Duration WINDOW_SIZE = Duration.ofSeconds(10);
static final SlidingWindows WINDOWS = SlidingWindows.ofTimeDifferenceWithNoGrace(WINDOW_SIZE);
input
.groupByKey()
.windowedBy(WINDOWS)
- .reduce((aggregate, value) -> aggregate + "-" + value)
+ .reduce(
+ (aggregate, value) -> aggregate + "-" + value,
+ Materialized.as(STORE_NAME))
.toStream((k,v) -> k.toString())
.to(OUTPUT);
void logStateStore(TopologyTestDriver testDriver)
{
- KeyValueIterator i = testDriver.getTimestampedWindowStore("KSTREAM-REDUCE-STATE-STORE-0000000001").all();
+ KeyValueIterator i = testDriver.getTimestampedWindowStore(STORE_NAME).all();
while(i.hasNext())
{
Object o = i.next();