From 5996d9a3066668e12dd8a1ac15005a189506a815 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 5 Jul 2024 20:58:21 +0200 Subject: [PATCH] Demonstration of the outcome of an Stream/Stream-Inner-Join --- .../counter/StreamStreamJoinTopologyTest.java | 130 ++++++++++++++++++ 1 file changed, 130 insertions(+) create mode 100644 src/test/java/de/juplo/kafka/wordcount/counter/StreamStreamJoinTopologyTest.java diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/StreamStreamJoinTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/StreamStreamJoinTopologyTest.java new file mode 100644 index 0000000..4c26e6c --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/counter/StreamStreamJoinTopologyTest.java @@ -0,0 +1,130 @@ +package de.juplo.kafka.wordcount.counter; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.*; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.test.TestRecord; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoField; +import java.util.Properties; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + + +public class StreamStreamJoinTopologyTest +{ + static final String LEFT = "TEST-LEFT"; + static final String RIGHT = "TEST-RIGHT"; + static final String JOINED = "TEST-JOINED"; + + static TestInputTopic inLeft; + static TestInputTopic inRight; + static TestOutputTopic outJoined; + + static final Instant T = Instant.now().with(ChronoField.MILLI_OF_SECOND, 0); + + static final TestRecord A = new TestRecord<>("foo", "A", T.minusSeconds(3)); + static final TestRecord a = new TestRecord<>("foo", "a", T.minusSeconds(4)); + static final TestRecord B = new TestRecord<>("foo", "B", T.minusSeconds(5)); + static final TestRecord b = new TestRecord<>("foo", "b", T.minusSeconds(6)); + static final TestRecord C = new TestRecord<>("foo", "C", T.minusSeconds(9)); + static final TestRecord c = new TestRecord<>("foo", "c", T.minusSeconds(10)); + static final TestRecord D = new TestRecord<>("foo", "D", T.minusSeconds(15)); + static final TestRecord d = new TestRecord<>("foo", "d", T.minusSeconds(14)); + static final TestRecord E = new TestRecord<>("foo", "E", T.minusSeconds(40)); + static final TestRecord F = new TestRecord<>("foo", "F", T.minusSeconds(60)); + static final TestRecord f = new TestRecord<>("foo", "f", T.minusSeconds(80)); + static final TestRecord G = new TestRecord<>("foo", "G", T.minusSeconds(100)); + + + @Test + public void test() + { + StreamsBuilder builder = new StreamsBuilder(); + + KStream left = builder.stream(LEFT); + KStream right = builder.stream(RIGHT); + + left + .join( + right, + (valueLeft, valueRight) -> valueLeft + "-" + valueRight, + JoinWindows.ofTimeDifferenceAndGrace( + Duration.ofSeconds(10), + Duration.ofSeconds(5))) + .to(JOINED); + + Topology topology = builder.build(); + + Properties properties = new Properties(); + properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + + TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties); + + inLeft = testDriver.createInputTopic( + LEFT, + Serdes.String().serializer(), + Serdes.String().serializer()); + inRight = testDriver.createInputTopic( + RIGHT, + Serdes.String().serializer(), + Serdes.String().serializer()); + outJoined = testDriver.createOutputTopic( + JOINED, + Serdes.String().deserializer(), + Serdes.String().deserializer()); + + + inLeft.pipeInput(A); + assertThat(outJoined.isEmpty()).isTrue(); + + inRight.pipeInput(a); + assertThat(outcome()).containsExactly("A-a"); + + inLeft.pipeInput(B); + assertThat(outcome()).containsExactly("B-a"); + + inRight.pipeInput(b); + assertThat(outcome()).containsExactlyInAnyOrder("A-b", "B-b"); + + inLeft.pipeInput(C); + assertThat(outcome()).containsExactlyInAnyOrder("C-a", "C-b"); + + inRight.pipeInput(c); + assertThat(outcome()).containsExactlyInAnyOrder("A-c", "B-c", "C-c"); + + inRight.pipeInput(d); + assertThat(outcome()).containsExactlyInAnyOrder("B-d", "C-d"); // ! + + inLeft.pipeInput(D); + assertThat(outcome()).containsExactlyInAnyOrder("D-b", "D-c", "D-d"); + + inLeft.pipeInput(E); + assertThat(noResult()); + + inLeft.pipeInput(F); + assertThat(noResult()); + + inLeft.pipeInput(G); + assertThat(noResult()); + } + + boolean noResult() + { + return outJoined.isEmpty(); + } + + Stream outcome() + { + return outJoined + .readRecordsToList() + .stream() + .map(record -> record.value()); + } +} -- 2.20.1