import java.time.Duration;
import java.time.Instant;
-import java.time.LocalTime;
-import java.time.ZoneId;
-import java.time.temporal.ChronoField;
import java.util.Properties;
import java.util.stream.Stream;
@Slf4j
public class JoinTopologyTest
{
+ static final Duration WINDOW_SIZE = Duration.ofSeconds(10);
+ static final Duration GRACE_PERIOD = Duration.ofSeconds(5);
+ static final JoinWindows WINDOWS = JoinWindows.ofTimeDifferenceAndGrace(WINDOW_SIZE, GRACE_PERIOD);
+
+
@Test
public void test()
{
.join(
right,
(valueLeft, valueRight) -> valueLeft + "-" + valueRight,
- JoinWindows.ofTimeDifferenceAndGrace(
- Duration.ofSeconds(10),
- Duration.ofSeconds(5)))
- .to(JOINED);
+ WINDOWS)
+ .to(OUTPUT);
Topology topology = builder.build();
+ log.info("Generated topology: {}", topology.describe());
Properties properties = new Properties();
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
RIGHT,
Serdes.String().serializer(),
Serdes.String().serializer());
- outJoined = testDriver.createOutputTopic(
- JOINED,
+ out = testDriver.createOutputTopic(
+ OUTPUT,
Serdes.String().deserializer(),
Serdes.String().deserializer());
- sendLeftAt("A", 3);
+ sendLeftAt("A", 63);
assertThatOutcomeIs();
- sendRightAt("a", 4);
+ sendRightAt("a", 64);
assertThatOutcomeIs("A-a");
- sendLeftAt("B", 5);
+ sendLeftAt("B", 65);
assertThatOutcomeIs("B-a");
- sendRightAt("b", 6);
+ sendRightAt("b", 66);
assertThatOutcomeIs("A-b", "B-b");
- sendLeftAt("C", 9);
+ sendLeftAt("C", 69);
assertThatOutcomeIs("C-a", "C-b");
- sendRightAt("c", 10);
+ sendRightAt("c", 70);
assertThatOutcomeIs("A-c", "B-c", "C-c");
- sendRightAt("d", 14);
+ sendRightAt("d", 74);
assertThatOutcomeIs("B-d", "C-d"); // !
- sendLeftAt("D", 15);
+ sendLeftAt("D", 75);
assertThatOutcomeIs("D-b", "D-c", "D-d");
- sendLeftAt("E", 40);
+ sendLeftAt("E", 100);
assertThatOutcomeIs();
- sendLeftAt("F", 60);
+ sendLeftAt("F", 120);
assertThatOutcomeIs();
- sendRightAt("f", 80);
+ sendRightAt("f", 140);
assertThatOutcomeIs();
- sendLeftAt("G", 100);
+ sendLeftAt("G", 160);
assertThatOutcomeIs();
}
+ static final String LEFT = "TEST-LEFT";
+ static final String RIGHT = "TEST-RIGHT";
+ static final String OUTPUT = "TEST-OUTPUT";
+
+ static final String KEY = "foo";
+
+
+ TestInputTopic<String, String> inLeft;
+ TestInputTopic<String, String> inRight;
+ TestOutputTopic<String, String> out;
+
+
void sendLeftAt(String value, int second)
{
- TestRecord<String, String> record = new TestRecord<>("foo", value, T.plusSeconds(second));
+ TestRecord<String, String> record = new TestRecord<>(KEY, value, Instant.ofEpochSecond(second));
log.info(
- "Sending LEFT at {}: {}",
- LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
+ "Sending LEFT @ {}: {}={}",
+ record.getRecordTime().toEpochMilli(),
+ record.key(),
record.value());
inLeft.pipeInput(record);
}
void sendRightAt(String value, int second)
{
- TestRecord<String, String> record = new TestRecord<>("foo", value, T.plusSeconds(second));
+ TestRecord<String, String> record = new TestRecord<>(KEY, value, Instant.ofEpochSecond(second));
log.info(
- "Sending RIGHT at {}: {}",
- LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
+ "Sending RIGHT @ {}: {}={}",
+ record.getRecordTime().toEpochMilli(),
+ record.key(),
record.value());
inRight.pipeInput(record);
}
- TestRecord<String, String> recordOf(String value, int second)
- {
- return new TestRecord<>("foo", value, T.plusSeconds(second));
- }
-
void assertThatOutcomeIs(String... expected)
{
assertThat(outcome()).containsExactly(expected);
Stream<String> outcome()
{
- return outJoined
+ return out
.readRecordsToList()
.stream()
.peek(record -> log.info(
- "Receiving join for {}: {}",
- LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
+ "Received join @ {}: {}={}",
+ record.getRecordTime().toEpochMilli(),
+ record.key(),
record.value()))
.map(record -> record.value());
}
-
-
- static final String LEFT = "TEST-LEFT";
- static final String RIGHT = "TEST-RIGHT";
- static final String JOINED = "TEST-JOINED";
-
- static final Instant T = Instant.now().with(ChronoField.MILLI_OF_SECOND, 0);
-
-
- TestInputTopic<String, String> inLeft;
- TestInputTopic<String, String> inRight;
- TestOutputTopic<String, String> outJoined;
}