From 656d4fb60f19d218726e36f27d25ec896a35ddae Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 17 Jul 2024 19:20:38 +0200 Subject: [PATCH] Aligned the example with the example for a Sliding Window * *Note:* Both examples need an explicit configuration for the materialization of the state as store, when suppression is activated! --- .../juplo/kafka/wordcount/counter/AggregationTopologyTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java index 8b64042..3dfd235 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java @@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat; @Slf4j public class AggregationTopologyTest { + static final String STORE_NAME = "aggregate-store"; static final Duration INACTIVITY_GAP = Duration.ofSeconds(2); static final SessionWindows WINDOWS = SessionWindows.ofInactivityGapWithNoGrace(INACTIVITY_GAP); @@ -39,7 +40,7 @@ public class AggregationTopologyTest .windowedBy(WINDOWS) .reduce( (aggregate, value) -> aggregate + "-" + value, - Materialized.>as("aggregated-store") + Materialized.>as(STORE_NAME) .withKeySerde(Serdes.String()) .withValueSerde(Serdes.String())) .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) -- 2.20.1