Refined log messages
authorKai Moritz <kai@juplo.de>
Sat, 6 Jul 2024 07:21:45 +0000 (09:21 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 6 Jul 2024 07:24:53 +0000 (09:24 +0200)
pom.xml
src/test/java/de/juplo/kafka/wordcount/counter/StreamStreamJoinTopologyTest.java

diff --git a/pom.xml b/pom.xml
index 6063371..4e264b6 100644 (file)
--- a/pom.xml
+++ b/pom.xml
        </parent>
 
        <groupId>de.juplo.kafka.streams.demos</groupId>
-       <artifactId>stream-stream--inner-join</artifactId>
+       <artifactId>stream-stream--left-join</artifactId>
        <version>1.0.0-SNAPSHOT</version>
 
-       <name>Outcome of an Inner Stream/Stream-Join</name>
+       <name>Outcome of a Left Stream/Stream-Join</name>
 
        <properties>
                <java.version>21</java.version>
index 39f59b7..d17dcde 100644 (file)
@@ -31,7 +31,7 @@ public class StreamStreamJoinTopologyTest
     KStream<String, String> right = builder.stream(RIGHT);
 
     left
-        .join(
+        .leftJoin(
             right,
             (valueLeft, valueRight) -> valueLeft + "-" + valueRight,
             JoinWindows.ofTimeDifferenceAndGrace(
@@ -89,10 +89,10 @@ public class StreamStreamJoinTopologyTest
     assertThatOutcomeIs();
 
     sendLeftAt("F", 60);
-    assertThatOutcomeIs();
+    assertThatOutcomeIs("E-null");
 
     sendRightAt("f", 80);
-    assertThatOutcomeIs();
+    assertThatOutcomeIs("F-null");
 
     sendLeftAt("G", 100);
     assertThatOutcomeIs();
@@ -103,9 +103,9 @@ public class StreamStreamJoinTopologyTest
   {
     TestRecord<String, String> record = new TestRecord<>("foo", value, T.plusSeconds(second));
     log.info(
-        "Sending left: {} at time {}",
-        record.value(),
-        LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()));
+        "Sending   LEFT  at {}: {}",
+        LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
+        record.value());
     inLeft.pipeInput(record);
   }
 
@@ -113,9 +113,9 @@ public class StreamStreamJoinTopologyTest
   {
     TestRecord<String, String> record = new TestRecord<>("foo", value, T.plusSeconds(second));
     log.info(
-        "Sending right: {} at time {}",
-        record.value(),
-        LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()));
+        "Sending   RIGHT at {}: {}",
+        LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
+        record.value());
     inRight.pipeInput(record);
   }
 
@@ -135,9 +135,9 @@ public class StreamStreamJoinTopologyTest
         .readRecordsToList()
         .stream()
         .peek(record -> log.info(
-            "Received join-outcome: {} for time {}",
-            record.value(),
-            LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault())))
+            "Receiving join for {}: {}",
+            LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
+            record.value()))
         .map(record -> record.value());
   }