</parent>
<groupId>de.juplo.kafka.streams.demos</groupId>
- <artifactId>stream-stream--inner-join</artifactId>
+ <artifactId>stream-stream--left-join</artifactId>
<version>1.0.0-SNAPSHOT</version>
- <name>Outcome of an Inner Stream/Stream-Join</name>
+ <name>Outcome of a Left Stream/Stream-Join</name>
<properties>
<java.version>21</java.version>
KStream<String, String> right = builder.stream(RIGHT);
left
- .join(
+ .leftJoin(
right,
(valueLeft, valueRight) -> valueLeft + "-" + valueRight,
JoinWindows.ofTimeDifferenceAndGrace(
assertThatOutcomeIs();
sendLeftAt("F", 60);
- assertThatOutcomeIs();
+ assertThatOutcomeIs("E-null");
sendRightAt("f", 80);
- assertThatOutcomeIs();
+ assertThatOutcomeIs("F-null");
sendLeftAt("G", 100);
assertThatOutcomeIs();
{
TestRecord<String, String> record = new TestRecord<>("foo", value, T.plusSeconds(second));
log.info(
- "Sending left: {} at time {}",
- record.value(),
- LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()));
+ "Sending LEFT at {}: {}",
+ LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
+ record.value());
inLeft.pipeInput(record);
}
{
TestRecord<String, String> record = new TestRecord<>("foo", value, T.plusSeconds(second));
log.info(
- "Sending right: {} at time {}",
- record.value(),
- LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()));
+ "Sending RIGHT at {}: {}",
+ LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
+ record.value());
inRight.pipeInput(record);
}
.readRecordsToList()
.stream()
.peek(record -> log.info(
- "Received join-outcome: {} for time {}",
- record.value(),
- LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault())))
+ "Receiving join for {}: {}",
+ LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
+ record.value()))
.map(record -> record.value());
}