--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.test.TestRecord;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.ChronoField;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+public class StreamStreamJoinTopologyTest
+{
+ static final String LEFT = "TEST-LEFT";
+ static final String RIGHT = "TEST-RIGHT";
+ static final String JOINED = "TEST-JOINED";
+
+ static TestInputTopic<String, String> inLeft;
+ static TestInputTopic<String, String> inRight;
+ static TestOutputTopic<String, String> outJoined;
+
+ static final Instant T = Instant.now().with(ChronoField.MILLI_OF_SECOND, 0);
+
+ static final TestRecord<String, String> A = new TestRecord<>("foo", "A", T.minusSeconds(3));
+ static final TestRecord<String, String> a = new TestRecord<>("foo", "a", T.minusSeconds(4));
+ static final TestRecord<String, String> B = new TestRecord<>("foo", "B", T.minusSeconds(5));
+ static final TestRecord<String, String> b = new TestRecord<>("foo", "b", T.minusSeconds(6));
+ static final TestRecord<String, String> C = new TestRecord<>("foo", "C", T.minusSeconds(9));
+ static final TestRecord<String, String> c = new TestRecord<>("foo", "c", T.minusSeconds(10));
+ static final TestRecord<String, String> D = new TestRecord<>("foo", "D", T.minusSeconds(15));
+ static final TestRecord<String, String> d = new TestRecord<>("foo", "d", T.minusSeconds(14));
+ static final TestRecord<String, String> E = new TestRecord<>("foo", "E", T.minusSeconds(40));
+ static final TestRecord<String, String> F = new TestRecord<>("foo", "F", T.minusSeconds(60));
+ static final TestRecord<String, String> f = new TestRecord<>("foo", "f", T.minusSeconds(80));
+ static final TestRecord<String, String> G = new TestRecord<>("foo", "G", T.minusSeconds(100));
+
+
+ @Test
+ public void test()
+ {
+ StreamsBuilder builder = new StreamsBuilder();
+
+ KStream<String, String> left = builder.stream(LEFT);
+ KStream<String, String> right = builder.stream(RIGHT);
+
+ left
+ .join(
+ right,
+ (valueLeft, valueRight) -> valueLeft + "-" + valueRight,
+ JoinWindows.ofTimeDifferenceAndGrace(
+ Duration.ofSeconds(10),
+ Duration.ofSeconds(5)))
+ .to(JOINED);
+
+ Topology topology = builder.build();
+
+ Properties properties = new Properties();
+ properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+ properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+
+ TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
+
+ inLeft = testDriver.createInputTopic(
+ LEFT,
+ Serdes.String().serializer(),
+ Serdes.String().serializer());
+ inRight = testDriver.createInputTopic(
+ RIGHT,
+ Serdes.String().serializer(),
+ Serdes.String().serializer());
+ outJoined = testDriver.createOutputTopic(
+ JOINED,
+ Serdes.String().deserializer(),
+ Serdes.String().deserializer());
+
+
+ inLeft.pipeInput(A);
+ assertThat(outJoined.isEmpty()).isTrue();
+
+ inRight.pipeInput(a);
+ assertThat(outcome()).containsExactly("A-a");
+
+ inLeft.pipeInput(B);
+ assertThat(outcome()).containsExactly("B-a");
+
+ inRight.pipeInput(b);
+ assertThat(outcome()).containsExactlyInAnyOrder("A-b", "B-b");
+
+ inLeft.pipeInput(C);
+ assertThat(outcome()).containsExactlyInAnyOrder("C-a", "C-b");
+
+ inRight.pipeInput(c);
+ assertThat(outcome()).containsExactlyInAnyOrder("A-c", "B-c", "C-c");
+
+ inRight.pipeInput(d);
+ assertThat(outcome()).containsExactlyInAnyOrder("B-d", "C-d"); // !
+
+ inLeft.pipeInput(D);
+ assertThat(outcome()).containsExactlyInAnyOrder("D-b", "D-c", "D-d");
+
+ inLeft.pipeInput(E);
+ assertThat(noResult());
+
+ inLeft.pipeInput(F);
+ assertThat(noResult());
+
+ inLeft.pipeInput(G);
+ assertThat(noResult());
+ }
+
+ boolean noResult()
+ {
+ return outJoined.isEmpty();
+ }
+
+ Stream<String> outcome()
+ {
+ return outJoined
+ .readRecordsToList()
+ .stream()
+ .map(record -> record.value());
+ }
+}