From: Kai Moritz Date: Mon, 8 Jul 2024 10:35:19 +0000 (+0200) Subject: Remodeled example into a demonstration of an aggregation with a hopping time window X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=bfe713ff31202ac940ac221deb0d622ea9d2ec87;p=demos%2Fkafka%2Fwordcount Remodeled example into a demonstration of an aggregation with a hopping time window --- diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java index c6bbb4e..94935be 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java @@ -3,9 +3,8 @@ package de.juplo.kafka.wordcount.counter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.EmitStrategy; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.*; +import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.test.TestRecord; import org.junit.jupiter.api.Test; @@ -14,6 +13,8 @@ import java.time.Instant; import java.time.LocalTime; import java.time.ZoneId; import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; @@ -22,6 +23,9 @@ import static org.assertj.core.api.Assertions.assertThat; @Slf4j public class AggregationTopologyTest { + static final TimeWindows WINDOWS = TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(10)); + + @Test public void test() { @@ -32,8 +36,10 @@ public class AggregationTopologyTest input .peek((k,v) -> log.info("peek-0 -- {} = {}", k, v)) .groupByKey() + .windowedBy(WINDOWS) + .emitStrategy(EmitStrategy.onWindowUpdate()) .reduce((aggregate, value) -> aggregate + "-" + value) - .toStream() + .toStream((k,v) -> k.toString()) .peek((k,v) -> log.info("peek-1 -- {} = {}", k, v)) .to(OUTPUT); @@ -58,40 +64,40 @@ public class AggregationTopologyTest sendAt("A", 63); - assertThatOutcomeIs("A"); + assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A")); sendAt("B", 64); - assertThatOutcomeIs("A-B"); + assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B")); sendAt("C", 65); - assertThatOutcomeIs("A-B-C"); + assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C")); sendAt("D", 66); - assertThatOutcomeIs("A-B-C-D"); + assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C-D")); sendAt("E", 69); - assertThatOutcomeIs("A-B-C-D-E"); + assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C-D-E")); sendAt("F", 70); - assertThatOutcomeIs("A-B-C-D-E-F"); + assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F")); sendAt("G", 74); - assertThatOutcomeIs("A-B-C-D-E-F-G"); + assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F-G")); sendAt("H", 75); - assertThatOutcomeIs("A-B-C-D-E-F-G-H"); + assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F-G-H")); sendAt("I", 100); - assertThatOutcomeIs("A-B-C-D-E-F-G-H-I"); + assertThatOutcomeIs(KeyValue.pair(windowFor(100), "I")); sendAt("J", 120); - assertThatOutcomeIs("A-B-C-D-E-F-G-H-I-J"); + assertThatOutcomeIs(KeyValue.pair(windowFor(120), "J")); sendAt("K", 140); - assertThatOutcomeIs("A-B-C-D-E-F-G-H-I-J-K"); + assertThatOutcomeIs(KeyValue.pair(windowFor(140), "K")); sendAt("L", 160); - assertThatOutcomeIs("A-B-C-D-E-F-G-H-I-J-K-L"); + assertThatOutcomeIs(KeyValue.pair(windowFor(160), "L")); } @@ -116,12 +122,12 @@ public class AggregationTopologyTest in.pipeInput(record); } - void assertThatOutcomeIs(String... expected) + void assertThatOutcomeIs(KeyValue, String>... expected) { assertThat(outcome()).containsExactly(expected); } - Stream outcome() + Stream, String>> outcome() { return out .readRecordsToList() @@ -132,6 +138,34 @@ public class AggregationTopologyTest LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()), record.key(), record.value())) - .map(record -> record.value()); + .map(record -> KeyValue.pair(parse(record.key()), record.value())); + } + + + static final Pattern PATTERN = Pattern.compile("^\\[([^@]*)@(\\d+)/(\\d+)\\]$"); + + Windowed parse(String serialized) + { + Matcher matcher = PATTERN.matcher(serialized); + + if (!matcher.matches()) + { + throw new IllegalArgumentException(serialized + "does not match " + PATTERN.pattern()); + } + + String key = matcher.group(1); + String start = matcher.group(2); + String end = matcher.group(3); + + Window window = new TimeWindow(Long.parseLong(start), Long.parseLong(end)); + + return new Windowed<>(key, window); + } + + Windowed windowFor(int second) + { + Instant time = Instant.ofEpochSecond(second); + long timestamp = time.toEpochMilli(); + return new Windowed<>(KEY, WINDOWS.windowsFor(timestamp).values().stream().findFirst().get()); } }