Demonstration of the outcome of an Stream/Stream-Inner-Join
authorKai Moritz <kai@juplo.de>
Fri, 5 Jul 2024 18:58:21 +0000 (20:58 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 5 Jul 2024 20:31:22 +0000 (22:31 +0200)
src/test/java/de/juplo/kafka/wordcount/counter/StreamStreamJoinTopologyTest.java [new file with mode: 0644]

diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/StreamStreamJoinTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/StreamStreamJoinTopologyTest.java
new file mode 100644 (file)
index 0000000..4c26e6c
--- /dev/null
@@ -0,0 +1,130 @@
+package de.juplo.kafka.wordcount.counter;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.test.TestRecord;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.ChronoField;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+public class StreamStreamJoinTopologyTest
+{
+  static final String LEFT = "TEST-LEFT";
+  static final String RIGHT = "TEST-RIGHT";
+  static final String JOINED = "TEST-JOINED";
+
+  static TestInputTopic<String, String> inLeft;
+  static TestInputTopic<String, String> inRight;
+  static TestOutputTopic<String, String> outJoined;
+
+  static final Instant T = Instant.now().with(ChronoField.MILLI_OF_SECOND, 0);
+
+  static final TestRecord<String, String> A = new TestRecord<>("foo", "A", T.minusSeconds(3));
+  static final TestRecord<String, String> a = new TestRecord<>("foo", "a", T.minusSeconds(4));
+  static final TestRecord<String, String> B = new TestRecord<>("foo", "B", T.minusSeconds(5));
+  static final TestRecord<String, String> b = new TestRecord<>("foo", "b", T.minusSeconds(6));
+  static final TestRecord<String, String> C = new TestRecord<>("foo", "C", T.minusSeconds(9));
+  static final TestRecord<String, String> c = new TestRecord<>("foo", "c", T.minusSeconds(10));
+  static final TestRecord<String, String> D = new TestRecord<>("foo", "D", T.minusSeconds(15));
+  static final TestRecord<String, String> d = new TestRecord<>("foo", "d", T.minusSeconds(14));
+  static final TestRecord<String, String> E = new TestRecord<>("foo", "E", T.minusSeconds(40));
+  static final TestRecord<String, String> F = new TestRecord<>("foo", "F", T.minusSeconds(60));
+  static final TestRecord<String, String> f = new TestRecord<>("foo", "f", T.minusSeconds(80));
+  static final TestRecord<String, String> G = new TestRecord<>("foo", "G", T.minusSeconds(100));
+
+
+  @Test
+  public void test()
+  {
+    StreamsBuilder builder = new StreamsBuilder();
+
+    KStream<String, String> left = builder.stream(LEFT);
+    KStream<String, String> right = builder.stream(RIGHT);
+
+    left
+        .join(
+            right,
+            (valueLeft, valueRight) -> valueLeft + "-" + valueRight,
+            JoinWindows.ofTimeDifferenceAndGrace(
+                Duration.ofSeconds(10),
+                Duration.ofSeconds(5)))
+        .to(JOINED);
+
+    Topology topology = builder.build();
+
+    Properties properties = new Properties();
+    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+
+    TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
+
+    inLeft = testDriver.createInputTopic(
+        LEFT,
+        Serdes.String().serializer(),
+        Serdes.String().serializer());
+    inRight = testDriver.createInputTopic(
+        RIGHT,
+        Serdes.String().serializer(),
+        Serdes.String().serializer());
+    outJoined = testDriver.createOutputTopic(
+        JOINED,
+        Serdes.String().deserializer(),
+        Serdes.String().deserializer());
+
+
+    inLeft.pipeInput(A);
+    assertThat(outJoined.isEmpty()).isTrue();
+
+    inRight.pipeInput(a);
+    assertThat(outcome()).containsExactly("A-a");
+
+    inLeft.pipeInput(B);
+    assertThat(outcome()).containsExactly("B-a");
+
+    inRight.pipeInput(b);
+    assertThat(outcome()).containsExactlyInAnyOrder("A-b", "B-b");
+
+    inLeft.pipeInput(C);
+    assertThat(outcome()).containsExactlyInAnyOrder("C-a", "C-b");
+
+    inRight.pipeInput(c);
+    assertThat(outcome()).containsExactlyInAnyOrder("A-c", "B-c", "C-c");
+
+    inRight.pipeInput(d);
+    assertThat(outcome()).containsExactlyInAnyOrder("B-d", "C-d"); // !
+
+    inLeft.pipeInput(D);
+    assertThat(outcome()).containsExactlyInAnyOrder("D-b", "D-c", "D-d");
+
+    inLeft.pipeInput(E);
+    assertThat(noResult());
+
+    inLeft.pipeInput(F);
+    assertThat(noResult());
+
+    inLeft.pipeInput(G);
+    assertThat(noResult());
+  }
+
+  boolean noResult()
+  {
+    return outJoined.isEmpty();
+  }
+
+  Stream<String> outcome()
+  {
+    return outJoined
+        .readRecordsToList()
+        .stream()
+        .map(record -> record.value());
+  }
+}