Added log-message, when sending a record
authorKai Moritz <kai@juplo.de>
Sat, 6 Jul 2024 07:10:54 +0000 (09:10 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 6 Jul 2024 07:10:54 +0000 (09:10 +0200)
src/test/java/de/juplo/kafka/wordcount/counter/StreamStreamJoinTopologyTest.java

index 3812e86..39f59b7 100644 (file)
@@ -61,44 +61,63 @@ public class StreamStreamJoinTopologyTest
         Serdes.String().deserializer());
 
 
-    inLeft.pipeInput(recordOf("A", 3));
+    sendLeftAt("A", 3);
     assertThatOutcomeIs();
 
-    inRight.pipeInput(recordOf("a", 4));
+    sendRightAt("a", 4);
     assertThatOutcomeIs("A-a");
 
-    inLeft.pipeInput(recordOf("B", 5));
+    sendLeftAt("B", 5);
     assertThatOutcomeIs("B-a");
 
-    inRight.pipeInput(recordOf("b", 6));
+    sendRightAt("b", 6);
     assertThatOutcomeIs("A-b", "B-b");
 
-    inLeft.pipeInput(recordOf("C", 9));
+    sendLeftAt("C", 9);
     assertThatOutcomeIs("C-a", "C-b");
 
-    inRight.pipeInput(recordOf("c", 10));
+    sendRightAt("c", 10);
     assertThatOutcomeIs("A-c", "B-c", "C-c");
 
-    inRight.pipeInput(recordOf("d", 14));
+    sendRightAt("d", 14);
     assertThatOutcomeIs("B-d", "C-d"); // !
 
-    inLeft.pipeInput(recordOf("D", 15));
+    sendLeftAt("D", 15);
     assertThatOutcomeIs("D-b", "D-c", "D-d");
 
-    inLeft.pipeInput(recordOf("E", 40));
+    sendLeftAt("E", 40);
     assertThatOutcomeIs();
 
-    inLeft.pipeInput(recordOf("F", 60));
+    sendLeftAt("F", 60);
     assertThatOutcomeIs();
 
-    inRight.pipeInput(recordOf("f", 80));
+    sendRightAt("f", 80);
     assertThatOutcomeIs();
 
-    inLeft.pipeInput(recordOf("G", 100));
+    sendLeftAt("G", 100);
     assertThatOutcomeIs();
   }
 
 
+  void sendLeftAt(String value, int second)
+  {
+    TestRecord<String, String> record = new TestRecord<>("foo", value, T.plusSeconds(second));
+    log.info(
+        "Sending left: {} at time {}",
+        record.value(),
+        LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()));
+    inLeft.pipeInput(record);
+  }
+
+  void sendRightAt(String value, int second)
+  {
+    TestRecord<String, String> record = new TestRecord<>("foo", value, T.plusSeconds(second));
+    log.info(
+        "Sending right: {} at time {}",
+        record.value(),
+        LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()));
+    inRight.pipeInput(record);
+  }
 
   TestRecord<String, String> recordOf(String value, int second)
   {