From: Kai Moritz Date: Wed, 17 Jul 2024 10:33:40 +0000 (+0200) Subject: Renamed test inot `AggregateWindowedSlidingTest` -- MOVE X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=0a4281d55125224c6f59c86e3835d48dff0a0faa;p=demos%2Fkafka%2Fwordcount Renamed test inot `AggregateWindowedSlidingTest` -- MOVE --- diff --git a/src/test/java/de/juplo/kafka/streams/aggregate/windowed/sliding/AggregateWindowedSlidingTest.java b/src/test/java/de/juplo/kafka/streams/aggregate/windowed/sliding/AggregateWindowedSlidingTest.java new file mode 100644 index 0000000..95e082d --- /dev/null +++ b/src/test/java/de/juplo/kafka/streams/aggregate/windowed/sliding/AggregateWindowedSlidingTest.java @@ -0,0 +1,240 @@ +package de.juplo.kafka.wordcount.counter; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.*; +import org.apache.kafka.streams.kstream.*; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.test.TestRecord; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.Instant; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + + +@Slf4j +public class AggregationTopologyTest +{ + static final String STORE_NAME = "aggregate-store"; + static final Duration WINDOW_SIZE = Duration.ofSeconds(10); + static final SlidingWindows WINDOWS = SlidingWindows.ofTimeDifferenceWithNoGrace(WINDOW_SIZE); + + + @Test + public void test() + { + StreamsBuilder builder = new StreamsBuilder(); + + KStream input = builder.stream(INPUT); + + input + .groupByKey() + .windowedBy(WINDOWS) + .reduce( + (aggregate, value) -> aggregate + "-" + value, + Materialized.as(STORE_NAME)) + .toStream((k,v) -> k.toString()) + .to(OUTPUT); + + Topology topology = builder.build(); + log.info("Generated topology: {}", topology.describe()); + + Properties properties = new Properties(); + properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + + TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties); + + in = testDriver.createInputTopic( + INPUT, + Serdes.String().serializer(), + Serdes.String().serializer()); + out = testDriver.createOutputTopic( + OUTPUT, + Serdes.String().deserializer(), + Serdes.String().deserializer()); + + + sendAt("A", 63); + assertThatOutcomeIs( + KeyValue.pair(windowFor(53000), "A")); + + logStateStore(testDriver); + + sendAt("B", 64); + assertThatOutcomeIs( + KeyValue.pair(windowFor(63001), "B"), + KeyValue.pair(windowFor(54000), "A-B")); + + logStateStore(testDriver); + + sendAt("C", 65); + assertThatOutcomeIs( + KeyValue.pair(windowFor(63001), "B-C"), + KeyValue.pair(windowFor(64001), "C"), + KeyValue.pair(windowFor(55000), "A-B-C")); + + logStateStore(testDriver); + + sendAt("D", 66); + assertThatOutcomeIs( + KeyValue.pair(windowFor(64001), "C-D"), + KeyValue.pair(windowFor(63001), "B-C-D"), + KeyValue.pair(windowFor(65001), "D"), + KeyValue.pair(windowFor(56000), "A-B-C-D")); + + logStateStore(testDriver); + + sendAt("E", 69); + assertThatOutcomeIs( + KeyValue.pair(windowFor(65001), "D-E"), + KeyValue.pair(windowFor(64001), "C-D-E"), + KeyValue.pair(windowFor(63001), "B-C-D-E"), + KeyValue.pair(windowFor(66001), "E"), + KeyValue.pair(windowFor(59000), "A-B-C-D-E")); + + logStateStore(testDriver); + + sendAt("F", 70); + assertThatOutcomeIs( + KeyValue.pair(windowFor(66001), "E-F"), + KeyValue.pair(windowFor(65001), "D-E-F"), + KeyValue.pair(windowFor(64001), "C-D-E-F"), + KeyValue.pair(windowFor(63001), "B-C-D-E-F"), + KeyValue.pair(windowFor(69001), "F"), + KeyValue.pair(windowFor(60000), "A-B-C-D-E-F")); + + logStateStore(testDriver); + + sendAt("G", 74); + assertThatOutcomeIs( + KeyValue.pair(windowFor(69001), "F-G"), + KeyValue.pair(windowFor(66001), "E-F-G"), + KeyValue.pair(windowFor(65001), "D-E-F-G"), + KeyValue.pair(windowFor(64001), "C-D-E-F-G"), + KeyValue.pair(windowFor(70001), "G"), + KeyValue.pair(windowFor(64000), "B-C-D-E-F-G")); + + logStateStore(testDriver); + + sendAt("H", 75); + assertThatOutcomeIs( + KeyValue.pair(windowFor(70001), "G-H"), + KeyValue.pair(windowFor(69001), "F-G-H"), + KeyValue.pair(windowFor(66001), "E-F-G-H"), + KeyValue.pair(windowFor(65001), "D-E-F-G-H"), + KeyValue.pair(windowFor(74001), "H"), + KeyValue.pair(windowFor(65000), "C-D-E-F-G-H")); + + logStateStore(testDriver); + + sendAt("I", 100); + assertThatOutcomeIs( + KeyValue.pair(windowFor(90000), "I")); + + logStateStore(testDriver); + + sendAt("J", 120); + assertThatOutcomeIs( + KeyValue.pair(windowFor(110000), "J")); + + logStateStore(testDriver); + + sendAt("K", 140); + assertThatOutcomeIs( + KeyValue.pair(windowFor(130000), "K")); + + logStateStore(testDriver); + + sendAt("L", 160); + assertThatOutcomeIs( + KeyValue.pair(windowFor(150000), "L")); + + logStateStore(testDriver); + } + + + static final String INPUT = "TEST-INPUT"; + static final String OUTPUT = "TEST-OUTPUT"; + + static final String KEY = "foo"; + + + TestInputTopic in; + TestOutputTopic out; + + + void sendAt(String value, int second) + { + TestRecord record = new TestRecord<>(KEY, value, Instant.ofEpochSecond(second)); + log.info( + "Sending @ {}: {} = {}", + record.getRecordTime().toEpochMilli(), + record.key(), + record.value()); + in.pipeInput(record); + } + + void assertThatOutcomeIs(KeyValue, String>... expected) + { + assertThat(outcome()).containsExactly(expected); + } + + Stream, String>> outcome() + { + return out + .readRecordsToList() + .stream() + .peek(record -> log.info( + "Received @ {}: {} = {}", + record.getRecordTime().toEpochMilli(), + record.key(), + record.value())) + .map(record -> KeyValue.pair(parse(record.key()), record.value())); + } + + void logStateStore(TopologyTestDriver testDriver) + { + KeyValueIterator i = testDriver.getTimestampedWindowStore(STORE_NAME).all(); + while(i.hasNext()) + { + Object o = i.next(); + log.info("{}", o); + } + } + + static final Pattern PATTERN = Pattern.compile("^\\[([^@]*)@(\\d+)/(\\d+)\\]$"); + + Windowed parse(String serialized) + { + Matcher matcher = PATTERN.matcher(serialized); + + if (!matcher.matches()) + { + throw new IllegalArgumentException(serialized + "does not match " + PATTERN.pattern()); + } + + String key = matcher.group(1); + String start = matcher.group(2); + String end = matcher.group(3); + + Window window = new TimeWindow(Long.parseLong(start), Long.parseLong(end)); + + return new Windowed<>(key, window); + } + + Windowed windowFor(long milli) + { + Instant startTime = Instant.ofEpochMilli(milli); + Instant endTime = startTime.plus(WINDOW_SIZE); + TimeWindow window = new TimeWindow(startTime.toEpochMilli(), endTime.toEpochMilli()); + return new Windowed<>(KEY, window); + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java deleted file mode 100644 index 95e082d..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java +++ /dev/null @@ -1,240 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.*; -import org.apache.kafka.streams.kstream.internals.TimeWindow; -import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.test.TestRecord; -import org.junit.jupiter.api.Test; - -import java.time.Duration; -import java.time.Instant; -import java.util.Properties; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Stream; - -import static org.assertj.core.api.Assertions.assertThat; - - -@Slf4j -public class AggregationTopologyTest -{ - static final String STORE_NAME = "aggregate-store"; - static final Duration WINDOW_SIZE = Duration.ofSeconds(10); - static final SlidingWindows WINDOWS = SlidingWindows.ofTimeDifferenceWithNoGrace(WINDOW_SIZE); - - - @Test - public void test() - { - StreamsBuilder builder = new StreamsBuilder(); - - KStream input = builder.stream(INPUT); - - input - .groupByKey() - .windowedBy(WINDOWS) - .reduce( - (aggregate, value) -> aggregate + "-" + value, - Materialized.as(STORE_NAME)) - .toStream((k,v) -> k.toString()) - .to(OUTPUT); - - Topology topology = builder.build(); - log.info("Generated topology: {}", topology.describe()); - - Properties properties = new Properties(); - properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); - properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); - - TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties); - - in = testDriver.createInputTopic( - INPUT, - Serdes.String().serializer(), - Serdes.String().serializer()); - out = testDriver.createOutputTopic( - OUTPUT, - Serdes.String().deserializer(), - Serdes.String().deserializer()); - - - sendAt("A", 63); - assertThatOutcomeIs( - KeyValue.pair(windowFor(53000), "A")); - - logStateStore(testDriver); - - sendAt("B", 64); - assertThatOutcomeIs( - KeyValue.pair(windowFor(63001), "B"), - KeyValue.pair(windowFor(54000), "A-B")); - - logStateStore(testDriver); - - sendAt("C", 65); - assertThatOutcomeIs( - KeyValue.pair(windowFor(63001), "B-C"), - KeyValue.pair(windowFor(64001), "C"), - KeyValue.pair(windowFor(55000), "A-B-C")); - - logStateStore(testDriver); - - sendAt("D", 66); - assertThatOutcomeIs( - KeyValue.pair(windowFor(64001), "C-D"), - KeyValue.pair(windowFor(63001), "B-C-D"), - KeyValue.pair(windowFor(65001), "D"), - KeyValue.pair(windowFor(56000), "A-B-C-D")); - - logStateStore(testDriver); - - sendAt("E", 69); - assertThatOutcomeIs( - KeyValue.pair(windowFor(65001), "D-E"), - KeyValue.pair(windowFor(64001), "C-D-E"), - KeyValue.pair(windowFor(63001), "B-C-D-E"), - KeyValue.pair(windowFor(66001), "E"), - KeyValue.pair(windowFor(59000), "A-B-C-D-E")); - - logStateStore(testDriver); - - sendAt("F", 70); - assertThatOutcomeIs( - KeyValue.pair(windowFor(66001), "E-F"), - KeyValue.pair(windowFor(65001), "D-E-F"), - KeyValue.pair(windowFor(64001), "C-D-E-F"), - KeyValue.pair(windowFor(63001), "B-C-D-E-F"), - KeyValue.pair(windowFor(69001), "F"), - KeyValue.pair(windowFor(60000), "A-B-C-D-E-F")); - - logStateStore(testDriver); - - sendAt("G", 74); - assertThatOutcomeIs( - KeyValue.pair(windowFor(69001), "F-G"), - KeyValue.pair(windowFor(66001), "E-F-G"), - KeyValue.pair(windowFor(65001), "D-E-F-G"), - KeyValue.pair(windowFor(64001), "C-D-E-F-G"), - KeyValue.pair(windowFor(70001), "G"), - KeyValue.pair(windowFor(64000), "B-C-D-E-F-G")); - - logStateStore(testDriver); - - sendAt("H", 75); - assertThatOutcomeIs( - KeyValue.pair(windowFor(70001), "G-H"), - KeyValue.pair(windowFor(69001), "F-G-H"), - KeyValue.pair(windowFor(66001), "E-F-G-H"), - KeyValue.pair(windowFor(65001), "D-E-F-G-H"), - KeyValue.pair(windowFor(74001), "H"), - KeyValue.pair(windowFor(65000), "C-D-E-F-G-H")); - - logStateStore(testDriver); - - sendAt("I", 100); - assertThatOutcomeIs( - KeyValue.pair(windowFor(90000), "I")); - - logStateStore(testDriver); - - sendAt("J", 120); - assertThatOutcomeIs( - KeyValue.pair(windowFor(110000), "J")); - - logStateStore(testDriver); - - sendAt("K", 140); - assertThatOutcomeIs( - KeyValue.pair(windowFor(130000), "K")); - - logStateStore(testDriver); - - sendAt("L", 160); - assertThatOutcomeIs( - KeyValue.pair(windowFor(150000), "L")); - - logStateStore(testDriver); - } - - - static final String INPUT = "TEST-INPUT"; - static final String OUTPUT = "TEST-OUTPUT"; - - static final String KEY = "foo"; - - - TestInputTopic in; - TestOutputTopic out; - - - void sendAt(String value, int second) - { - TestRecord record = new TestRecord<>(KEY, value, Instant.ofEpochSecond(second)); - log.info( - "Sending @ {}: {} = {}", - record.getRecordTime().toEpochMilli(), - record.key(), - record.value()); - in.pipeInput(record); - } - - void assertThatOutcomeIs(KeyValue, String>... expected) - { - assertThat(outcome()).containsExactly(expected); - } - - Stream, String>> outcome() - { - return out - .readRecordsToList() - .stream() - .peek(record -> log.info( - "Received @ {}: {} = {}", - record.getRecordTime().toEpochMilli(), - record.key(), - record.value())) - .map(record -> KeyValue.pair(parse(record.key()), record.value())); - } - - void logStateStore(TopologyTestDriver testDriver) - { - KeyValueIterator i = testDriver.getTimestampedWindowStore(STORE_NAME).all(); - while(i.hasNext()) - { - Object o = i.next(); - log.info("{}", o); - } - } - - static final Pattern PATTERN = Pattern.compile("^\\[([^@]*)@(\\d+)/(\\d+)\\]$"); - - Windowed parse(String serialized) - { - Matcher matcher = PATTERN.matcher(serialized); - - if (!matcher.matches()) - { - throw new IllegalArgumentException(serialized + "does not match " + PATTERN.pattern()); - } - - String key = matcher.group(1); - String start = matcher.group(2); - String end = matcher.group(3); - - Window window = new TimeWindow(Long.parseLong(start), Long.parseLong(end)); - - return new Windowed<>(key, window); - } - - Windowed windowFor(long milli) - { - Instant startTime = Instant.ofEpochMilli(milli); - Instant endTime = startTime.plus(WINDOW_SIZE); - TimeWindow window = new TimeWindow(startTime.toEpochMilli(), endTime.toEpochMilli()); - return new Windowed<>(KEY, window); - } -}