@Slf4j
public class AggregationTopologyTest
{
+ static final String STORE_NAME = "aggregate-store";
static final Duration INACTIVITY_GAP = Duration.ofSeconds(2);
static final SessionWindows WINDOWS = SessionWindows.ofInactivityGapWithNoGrace(INACTIVITY_GAP);
.windowedBy(WINDOWS)
.reduce(
(aggregate, value) -> aggregate + "-" + value,
- Materialized.<String, String, SessionStore<Bytes, byte[]>>as("aggregated-store")
+ Materialized.<String, String, SessionStore<Bytes, byte[]>>as(STORE_NAME)
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))