From: Kai Moritz Date: Mon, 8 Jul 2024 07:23:15 +0000 (+0200) Subject: Renaming for demonstration of an aggregation -- MOVE X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=830023fc02f4681f27bf6793b77b2a04082508b6;p=demos%2Fkafka%2Fwordcount Renaming for demonstration of an aggregation -- MOVE --- diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java new file mode 100644 index 0000000..f34dfbe --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java @@ -0,0 +1,155 @@ +package de.juplo.kafka.wordcount.counter; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.*; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.test.TestRecord; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.Instant; +import java.time.LocalTime; +import java.time.ZoneId; +import java.time.temporal.ChronoField; +import java.util.Properties; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + + +@Slf4j +public class StreamStreamJoinTopologyTest +{ + @Test + public void test() + { + StreamsBuilder builder = new StreamsBuilder(); + + KStream left = builder.stream(LEFT); + KStream right = builder.stream(RIGHT); + + left + .join( + right, + (valueLeft, valueRight) -> valueLeft + "-" + valueRight, + JoinWindows.ofTimeDifferenceAndGrace( + Duration.ofSeconds(10), + Duration.ofSeconds(5))) + .to(JOINED); + + Topology topology = builder.build(); + + Properties properties = new Properties(); + properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + + TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties); + + inLeft = testDriver.createInputTopic( + LEFT, + Serdes.String().serializer(), + Serdes.String().serializer()); + inRight = testDriver.createInputTopic( + RIGHT, + Serdes.String().serializer(), + Serdes.String().serializer()); + outJoined = testDriver.createOutputTopic( + JOINED, + Serdes.String().deserializer(), + Serdes.String().deserializer()); + + + sendLeftAt("A", 3); + assertThatOutcomeIs(); + + sendRightAt("a", 4); + assertThatOutcomeIs("A-a"); + + sendLeftAt("B", 5); + assertThatOutcomeIs("B-a"); + + sendRightAt("b", 6); + assertThatOutcomeIs("A-b", "B-b"); + + sendLeftAt("C", 9); + assertThatOutcomeIs("C-a", "C-b"); + + sendRightAt("c", 10); + assertThatOutcomeIs("A-c", "B-c", "C-c"); + + sendRightAt("d", 14); + assertThatOutcomeIs("B-d", "C-d"); // ! + + sendLeftAt("D", 15); + assertThatOutcomeIs("D-b", "D-c", "D-d"); + + sendLeftAt("E", 40); + assertThatOutcomeIs(); + + sendLeftAt("F", 60); + assertThatOutcomeIs(); + + sendRightAt("f", 80); + assertThatOutcomeIs(); + + sendLeftAt("G", 100); + assertThatOutcomeIs(); + } + + + void sendLeftAt(String value, int second) + { + TestRecord record = new TestRecord<>("foo", value, T.plusSeconds(second)); + log.info( + "Sending LEFT at {}: {}", + LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()), + record.value()); + inLeft.pipeInput(record); + } + + void sendRightAt(String value, int second) + { + TestRecord record = new TestRecord<>("foo", value, T.plusSeconds(second)); + log.info( + "Sending RIGHT at {}: {}", + LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()), + record.value()); + inRight.pipeInput(record); + } + + TestRecord recordOf(String value, int second) + { + return new TestRecord<>("foo", value, T.plusSeconds(second)); + } + + void assertThatOutcomeIs(String... expected) + { + assertThat(outcome()).containsExactly(expected); + } + + Stream outcome() + { + return outJoined + .readRecordsToList() + .stream() + .peek(record -> log.info( + "Receiving join for {}: {}", + LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()), + record.value())) + .map(record -> record.value()); + } + + + static final String LEFT = "TEST-LEFT"; + static final String RIGHT = "TEST-RIGHT"; + static final String JOINED = "TEST-JOINED"; + + static final Instant T = Instant.now().with(ChronoField.MILLI_OF_SECOND, 0); + + + TestInputTopic inLeft; + TestInputTopic inRight; + TestOutputTopic outJoined; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/StreamStreamJoinTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/StreamStreamJoinTopologyTest.java deleted file mode 100644 index f34dfbe..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/counter/StreamStreamJoinTopologyTest.java +++ /dev/null @@ -1,155 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.JoinWindows; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.test.TestRecord; -import org.junit.jupiter.api.Test; - -import java.time.Duration; -import java.time.Instant; -import java.time.LocalTime; -import java.time.ZoneId; -import java.time.temporal.ChronoField; -import java.util.Properties; -import java.util.stream.Stream; - -import static org.assertj.core.api.Assertions.assertThat; - - -@Slf4j -public class StreamStreamJoinTopologyTest -{ - @Test - public void test() - { - StreamsBuilder builder = new StreamsBuilder(); - - KStream left = builder.stream(LEFT); - KStream right = builder.stream(RIGHT); - - left - .join( - right, - (valueLeft, valueRight) -> valueLeft + "-" + valueRight, - JoinWindows.ofTimeDifferenceAndGrace( - Duration.ofSeconds(10), - Duration.ofSeconds(5))) - .to(JOINED); - - Topology topology = builder.build(); - - Properties properties = new Properties(); - properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); - properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); - - TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties); - - inLeft = testDriver.createInputTopic( - LEFT, - Serdes.String().serializer(), - Serdes.String().serializer()); - inRight = testDriver.createInputTopic( - RIGHT, - Serdes.String().serializer(), - Serdes.String().serializer()); - outJoined = testDriver.createOutputTopic( - JOINED, - Serdes.String().deserializer(), - Serdes.String().deserializer()); - - - sendLeftAt("A", 3); - assertThatOutcomeIs(); - - sendRightAt("a", 4); - assertThatOutcomeIs("A-a"); - - sendLeftAt("B", 5); - assertThatOutcomeIs("B-a"); - - sendRightAt("b", 6); - assertThatOutcomeIs("A-b", "B-b"); - - sendLeftAt("C", 9); - assertThatOutcomeIs("C-a", "C-b"); - - sendRightAt("c", 10); - assertThatOutcomeIs("A-c", "B-c", "C-c"); - - sendRightAt("d", 14); - assertThatOutcomeIs("B-d", "C-d"); // ! - - sendLeftAt("D", 15); - assertThatOutcomeIs("D-b", "D-c", "D-d"); - - sendLeftAt("E", 40); - assertThatOutcomeIs(); - - sendLeftAt("F", 60); - assertThatOutcomeIs(); - - sendRightAt("f", 80); - assertThatOutcomeIs(); - - sendLeftAt("G", 100); - assertThatOutcomeIs(); - } - - - void sendLeftAt(String value, int second) - { - TestRecord record = new TestRecord<>("foo", value, T.plusSeconds(second)); - log.info( - "Sending LEFT at {}: {}", - LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()), - record.value()); - inLeft.pipeInput(record); - } - - void sendRightAt(String value, int second) - { - TestRecord record = new TestRecord<>("foo", value, T.plusSeconds(second)); - log.info( - "Sending RIGHT at {}: {}", - LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()), - record.value()); - inRight.pipeInput(record); - } - - TestRecord recordOf(String value, int second) - { - return new TestRecord<>("foo", value, T.plusSeconds(second)); - } - - void assertThatOutcomeIs(String... expected) - { - assertThat(outcome()).containsExactly(expected); - } - - Stream outcome() - { - return outJoined - .readRecordsToList() - .stream() - .peek(record -> log.info( - "Receiving join for {}: {}", - LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()), - record.value())) - .map(record -> record.value()); - } - - - static final String LEFT = "TEST-LEFT"; - static final String RIGHT = "TEST-RIGHT"; - static final String JOINED = "TEST-JOINED"; - - static final Instant T = Instant.now().with(ChronoField.MILLI_OF_SECOND, 0); - - - TestInputTopic inLeft; - TestInputTopic inRight; - TestOutputTopic outJoined; -}