From: Kai Moritz Date: Mon, 8 Jul 2024 10:35:19 +0000 (+0200) Subject: Example for an Aggregation with a Tumbling Time Window X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=9259d7b201df3d567c5da9acc266df655ccec78f;p=demos%2Fkafka%2Fwordcount Example for an Aggregation with a Tumbling Time Window --- diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java index d21ad47..2f78dc7 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java @@ -3,12 +3,16 @@ package de.juplo.kafka.wordcount.counter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.*; +import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.test.TestRecord; import org.junit.jupiter.api.Test; +import java.time.Duration; import java.time.Instant; import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; @@ -17,6 +21,9 @@ import static org.assertj.core.api.Assertions.assertThat; @Slf4j public class AggregationTopologyTest { + static final TimeWindows WINDOWS = TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(10)); + + @Test public void test() { @@ -27,8 +34,10 @@ public class AggregationTopologyTest input .peek((k,v) -> log.info("peek-0 -- {} = {}", k, v)) .groupByKey() + .windowedBy(WINDOWS) + .emitStrategy(EmitStrategy.onWindowUpdate()) .reduce((aggregate, value) -> aggregate + "-" + value) - .toStream() + .toStream((k,v) -> k.toString()) .peek((k,v) -> log.info("peek-1 -- {} = {}", k, v)) .to(OUTPUT); @@ -53,40 +62,40 @@ public class AggregationTopologyTest sendAt("A", 63); - assertThatOutcomeIs("A"); + assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A")); sendAt("B", 64); - assertThatOutcomeIs("A-B"); + assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B")); sendAt("C", 65); - assertThatOutcomeIs("A-B-C"); + assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C")); sendAt("D", 66); - assertThatOutcomeIs("A-B-C-D"); + assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C-D")); sendAt("E", 69); - assertThatOutcomeIs("A-B-C-D-E"); + assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C-D-E")); sendAt("F", 70); - assertThatOutcomeIs("A-B-C-D-E-F"); + assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F")); sendAt("G", 74); - assertThatOutcomeIs("A-B-C-D-E-F-G"); + assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F-G")); sendAt("H", 75); - assertThatOutcomeIs("A-B-C-D-E-F-G-H"); + assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F-G-H")); sendAt("I", 100); - assertThatOutcomeIs("A-B-C-D-E-F-G-H-I"); + assertThatOutcomeIs(KeyValue.pair(windowFor(100), "I")); sendAt("J", 120); - assertThatOutcomeIs("A-B-C-D-E-F-G-H-I-J"); + assertThatOutcomeIs(KeyValue.pair(windowFor(120), "J")); sendAt("K", 140); - assertThatOutcomeIs("A-B-C-D-E-F-G-H-I-J-K"); + assertThatOutcomeIs(KeyValue.pair(windowFor(140), "K")); sendAt("L", 160); - assertThatOutcomeIs("A-B-C-D-E-F-G-H-I-J-K-L"); + assertThatOutcomeIs(KeyValue.pair(windowFor(160), "L")); } @@ -111,12 +120,12 @@ public class AggregationTopologyTest in.pipeInput(record); } - void assertThatOutcomeIs(String... expected) + void assertThatOutcomeIs(KeyValue, String>... expected) { assertThat(outcome()).containsExactly(expected); } - Stream outcome() + Stream, String>> outcome() { return out .readRecordsToList() @@ -126,6 +135,34 @@ public class AggregationTopologyTest record.getRecordTime().toEpochMilli(), record.key(), record.value())) - .map(record -> record.value()); + .map(record -> KeyValue.pair(parse(record.key()), record.value())); + } + + + static final Pattern PATTERN = Pattern.compile("^\\[([^@]*)@(\\d+)/(\\d+)\\]$"); + + Windowed parse(String serialized) + { + Matcher matcher = PATTERN.matcher(serialized); + + if (!matcher.matches()) + { + throw new IllegalArgumentException(serialized + "does not match " + PATTERN.pattern()); + } + + String key = matcher.group(1); + String start = matcher.group(2); + String end = matcher.group(3); + + Window window = new TimeWindow(Long.parseLong(start), Long.parseLong(end)); + + return new Windowed<>(key, window); + } + + Windowed windowFor(int second) + { + Instant time = Instant.ofEpochSecond(second); + long timestamp = time.toEpochMilli(); + return new Windowed<>(KEY, WINDOWS.windowsFor(timestamp).values().stream().findFirst().get()); } }