--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.test.TestRecord;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.temporal.ChronoField;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+@Slf4j
+public class StreamStreamJoinTopologyTest
+{
+ @Test
+ public void test()
+ {
+ StreamsBuilder builder = new StreamsBuilder();
+
+ KStream<String, String> left = builder.stream(LEFT);
+ KStream<String, String> right = builder.stream(RIGHT);
+
+ left
+ .join(
+ right,
+ (valueLeft, valueRight) -> valueLeft + "-" + valueRight,
+ JoinWindows.ofTimeDifferenceAndGrace(
+ Duration.ofSeconds(10),
+ Duration.ofSeconds(5)))
+ .to(JOINED);
+
+ Topology topology = builder.build();
+
+ Properties properties = new Properties();
+ properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+ properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+
+ TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
+
+ inLeft = testDriver.createInputTopic(
+ LEFT,
+ Serdes.String().serializer(),
+ Serdes.String().serializer());
+ inRight = testDriver.createInputTopic(
+ RIGHT,
+ Serdes.String().serializer(),
+ Serdes.String().serializer());
+ outJoined = testDriver.createOutputTopic(
+ JOINED,
+ Serdes.String().deserializer(),
+ Serdes.String().deserializer());
+
+
+ sendLeftAt("A", 3);
+ assertThatOutcomeIs();
+
+ sendRightAt("a", 4);
+ assertThatOutcomeIs("A-a");
+
+ sendLeftAt("B", 5);
+ assertThatOutcomeIs("B-a");
+
+ sendRightAt("b", 6);
+ assertThatOutcomeIs("A-b", "B-b");
+
+ sendLeftAt("C", 9);
+ assertThatOutcomeIs("C-a", "C-b");
+
+ sendRightAt("c", 10);
+ assertThatOutcomeIs("A-c", "B-c", "C-c");
+
+ sendRightAt("d", 14);
+ assertThatOutcomeIs("B-d", "C-d"); // !
+
+ sendLeftAt("D", 15);
+ assertThatOutcomeIs("D-b", "D-c", "D-d");
+
+ sendLeftAt("E", 40);
+ assertThatOutcomeIs();
+
+ sendLeftAt("F", 60);
+ assertThatOutcomeIs();
+
+ sendRightAt("f", 80);
+ assertThatOutcomeIs();
+
+ sendLeftAt("G", 100);
+ assertThatOutcomeIs();
+ }
+
+
+ void sendLeftAt(String value, int second)
+ {
+ TestRecord<String, String> record = new TestRecord<>("foo", value, T.plusSeconds(second));
+ log.info(
+ "Sending LEFT at {}: {}",
+ LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
+ record.value());
+ inLeft.pipeInput(record);
+ }
+
+ void sendRightAt(String value, int second)
+ {
+ TestRecord<String, String> record = new TestRecord<>("foo", value, T.plusSeconds(second));
+ log.info(
+ "Sending RIGHT at {}: {}",
+ LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
+ record.value());
+ inRight.pipeInput(record);
+ }
+
+ TestRecord<String, String> recordOf(String value, int second)
+ {
+ return new TestRecord<>("foo", value, T.plusSeconds(second));
+ }
+
+ void assertThatOutcomeIs(String... expected)
+ {
+ assertThat(outcome()).containsExactly(expected);
+ }
+
+ Stream<String> outcome()
+ {
+ return outJoined
+ .readRecordsToList()
+ .stream()
+ .peek(record -> log.info(
+ "Receiving join for {}: {}",
+ LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
+ record.value()))
+ .map(record -> record.value());
+ }
+
+
+ static final String LEFT = "TEST-LEFT";
+ static final String RIGHT = "TEST-RIGHT";
+ static final String JOINED = "TEST-JOINED";
+
+ static final Instant T = Instant.now().with(ChronoField.MILLI_OF_SECOND, 0);
+
+
+ TestInputTopic<String, String> inLeft;
+ TestInputTopic<String, String> inRight;
+ TestOutputTopic<String, String> outJoined;
+}
+++ /dev/null
-package de.juplo.kafka.wordcount.counter;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.JoinWindows;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.test.TestRecord;
-import org.junit.jupiter.api.Test;
-
-import java.time.Duration;
-import java.time.Instant;
-import java.time.LocalTime;
-import java.time.ZoneId;
-import java.time.temporal.ChronoField;
-import java.util.Properties;
-import java.util.stream.Stream;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-
-@Slf4j
-public class StreamStreamJoinTopologyTest
-{
- @Test
- public void test()
- {
- StreamsBuilder builder = new StreamsBuilder();
-
- KStream<String, String> left = builder.stream(LEFT);
- KStream<String, String> right = builder.stream(RIGHT);
-
- left
- .join(
- right,
- (valueLeft, valueRight) -> valueLeft + "-" + valueRight,
- JoinWindows.ofTimeDifferenceAndGrace(
- Duration.ofSeconds(10),
- Duration.ofSeconds(5)))
- .to(JOINED);
-
- Topology topology = builder.build();
-
- Properties properties = new Properties();
- properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
- properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
-
- TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
-
- inLeft = testDriver.createInputTopic(
- LEFT,
- Serdes.String().serializer(),
- Serdes.String().serializer());
- inRight = testDriver.createInputTopic(
- RIGHT,
- Serdes.String().serializer(),
- Serdes.String().serializer());
- outJoined = testDriver.createOutputTopic(
- JOINED,
- Serdes.String().deserializer(),
- Serdes.String().deserializer());
-
-
- sendLeftAt("A", 3);
- assertThatOutcomeIs();
-
- sendRightAt("a", 4);
- assertThatOutcomeIs("A-a");
-
- sendLeftAt("B", 5);
- assertThatOutcomeIs("B-a");
-
- sendRightAt("b", 6);
- assertThatOutcomeIs("A-b", "B-b");
-
- sendLeftAt("C", 9);
- assertThatOutcomeIs("C-a", "C-b");
-
- sendRightAt("c", 10);
- assertThatOutcomeIs("A-c", "B-c", "C-c");
-
- sendRightAt("d", 14);
- assertThatOutcomeIs("B-d", "C-d"); // !
-
- sendLeftAt("D", 15);
- assertThatOutcomeIs("D-b", "D-c", "D-d");
-
- sendLeftAt("E", 40);
- assertThatOutcomeIs();
-
- sendLeftAt("F", 60);
- assertThatOutcomeIs();
-
- sendRightAt("f", 80);
- assertThatOutcomeIs();
-
- sendLeftAt("G", 100);
- assertThatOutcomeIs();
- }
-
-
- void sendLeftAt(String value, int second)
- {
- TestRecord<String, String> record = new TestRecord<>("foo", value, T.plusSeconds(second));
- log.info(
- "Sending LEFT at {}: {}",
- LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
- record.value());
- inLeft.pipeInput(record);
- }
-
- void sendRightAt(String value, int second)
- {
- TestRecord<String, String> record = new TestRecord<>("foo", value, T.plusSeconds(second));
- log.info(
- "Sending RIGHT at {}: {}",
- LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
- record.value());
- inRight.pipeInput(record);
- }
-
- TestRecord<String, String> recordOf(String value, int second)
- {
- return new TestRecord<>("foo", value, T.plusSeconds(second));
- }
-
- void assertThatOutcomeIs(String... expected)
- {
- assertThat(outcome()).containsExactly(expected);
- }
-
- Stream<String> outcome()
- {
- return outJoined
- .readRecordsToList()
- .stream()
- .peek(record -> log.info(
- "Receiving join for {}: {}",
- LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
- record.value()))
- .map(record -> record.value());
- }
-
-
- static final String LEFT = "TEST-LEFT";
- static final String RIGHT = "TEST-RIGHT";
- static final String JOINED = "TEST-JOINED";
-
- static final Instant T = Instant.now().with(ChronoField.MILLI_OF_SECOND, 0);
-
-
- TestInputTopic<String, String> inLeft;
- TestInputTopic<String, String> inRight;
- TestOutputTopic<String, String> outJoined;
-}