From: Kai Moritz Date: Sat, 6 Jul 2024 07:21:45 +0000 (+0200) Subject: Refined log messages X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=3eb93908c0cf8d2144b01251803d9a7f4bc0cebf;p=demos%2Fkafka%2Fwordcount Refined log messages --- diff --git a/pom.xml b/pom.xml index 6063371..4e264b6 100644 --- a/pom.xml +++ b/pom.xml @@ -12,10 +12,10 @@ de.juplo.kafka.streams.demos - stream-stream--inner-join + stream-stream--left-join 1.0.0-SNAPSHOT - Outcome of an Inner Stream/Stream-Join + Outcome of a Left Stream/Stream-Join 21 diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/StreamStreamJoinTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/StreamStreamJoinTopologyTest.java index 39f59b7..d17dcde 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/StreamStreamJoinTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/StreamStreamJoinTopologyTest.java @@ -31,7 +31,7 @@ public class StreamStreamJoinTopologyTest KStream right = builder.stream(RIGHT); left - .join( + .leftJoin( right, (valueLeft, valueRight) -> valueLeft + "-" + valueRight, JoinWindows.ofTimeDifferenceAndGrace( @@ -89,10 +89,10 @@ public class StreamStreamJoinTopologyTest assertThatOutcomeIs(); sendLeftAt("F", 60); - assertThatOutcomeIs(); + assertThatOutcomeIs("E-null"); sendRightAt("f", 80); - assertThatOutcomeIs(); + assertThatOutcomeIs("F-null"); sendLeftAt("G", 100); assertThatOutcomeIs(); @@ -103,9 +103,9 @@ public class StreamStreamJoinTopologyTest { TestRecord record = new TestRecord<>("foo", value, T.plusSeconds(second)); log.info( - "Sending left: {} at time {}", - record.value(), - LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault())); + "Sending LEFT at {}: {}", + LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()), + record.value()); inLeft.pipeInput(record); } @@ -113,9 +113,9 @@ public class StreamStreamJoinTopologyTest { TestRecord record = new TestRecord<>("foo", value, T.plusSeconds(second)); log.info( - "Sending right: {} at time {}", - record.value(), - LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault())); + "Sending RIGHT at {}: {}", + LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()), + record.value()); inRight.pipeInput(record); } @@ -135,9 +135,9 @@ public class StreamStreamJoinTopologyTest .readRecordsToList() .stream() .peek(record -> log.info( - "Received join-outcome: {} for time {}", - record.value(), - LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()))) + "Receiving join for {}: {}", + LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()), + record.value())) .map(record -> record.value()); }