Aligned the join-example withe the aggregation-examples
authorKai Moritz <kai@juplo.de>
Sat, 13 Jul 2024 09:14:43 +0000 (11:14 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 13 Jul 2024 09:14:43 +0000 (11:14 +0200)
src/test/java/de/juplo/kafka/wordcount/counter/JoinTopologyTest.java

index 9ca35bf..1b0b74b 100644 (file)
@@ -10,9 +10,6 @@ import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
 import java.time.Instant;
-import java.time.LocalTime;
-import java.time.ZoneId;
-import java.time.temporal.ChronoField;
 import java.util.Properties;
 import java.util.stream.Stream;
 
@@ -22,6 +19,11 @@ import static org.assertj.core.api.Assertions.assertThat;
 @Slf4j
 public class JoinTopologyTest
 {
+  static final Duration WINDOW_SIZE = Duration.ofSeconds(10);
+  static final Duration GRACE_PERIOD = Duration.ofSeconds(5);
+  static final JoinWindows WINDOWS = JoinWindows.ofTimeDifferenceAndGrace(WINDOW_SIZE, GRACE_PERIOD);
+
+
   @Test
   public void test()
   {
@@ -34,12 +36,11 @@ public class JoinTopologyTest
         .join(
             right,
             (valueLeft, valueRight) -> valueLeft + "-" + valueRight,
-            JoinWindows.ofTimeDifferenceAndGrace(
-                Duration.ofSeconds(10),
-                Duration.ofSeconds(5)))
-        .to(JOINED);
+            WINDOWS)
+        .to(OUTPUT);
 
     Topology topology = builder.build();
+    log.info("Generated topology: {}", topology.describe());
 
     Properties properties = new Properties();
     properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
@@ -55,75 +56,84 @@ public class JoinTopologyTest
         RIGHT,
         Serdes.String().serializer(),
         Serdes.String().serializer());
-    outJoined = testDriver.createOutputTopic(
-        JOINED,
+    out = testDriver.createOutputTopic(
+        OUTPUT,
         Serdes.String().deserializer(),
         Serdes.String().deserializer());
 
 
-    sendLeftAt("A", 3);
+    sendLeftAt("A", 63);
     assertThatOutcomeIs();
 
-    sendRightAt("a", 4);
+    sendRightAt("a", 64);
     assertThatOutcomeIs("A-a");
 
-    sendLeftAt("B", 5);
+    sendLeftAt("B", 65);
     assertThatOutcomeIs("B-a");
 
-    sendRightAt("b", 6);
+    sendRightAt("b", 66);
     assertThatOutcomeIs("A-b", "B-b");
 
-    sendLeftAt("C", 9);
+    sendLeftAt("C", 69);
     assertThatOutcomeIs("C-a", "C-b");
 
-    sendRightAt("c", 10);
+    sendRightAt("c", 70);
     assertThatOutcomeIs("A-c", "B-c", "C-c");
 
-    sendRightAt("d", 14);
+    sendRightAt("d", 74);
     assertThatOutcomeIs("B-d", "C-d"); // !
 
-    sendLeftAt("D", 15);
+    sendLeftAt("D", 75);
     assertThatOutcomeIs("D-b", "D-c", "D-d");
 
-    sendLeftAt("E", 40);
+    sendLeftAt("E", 100);
     assertThatOutcomeIs();
 
-    sendLeftAt("F", 60);
+    sendLeftAt("F", 120);
     assertThatOutcomeIs();
 
-    sendRightAt("f", 80);
+    sendRightAt("f", 140);
     assertThatOutcomeIs();
 
-    sendLeftAt("G", 100);
+    sendLeftAt("G", 160);
     assertThatOutcomeIs();
   }
 
 
+  static final String LEFT = "TEST-LEFT";
+  static final String RIGHT = "TEST-RIGHT";
+  static final String OUTPUT = "TEST-OUTPUT";
+
+  static final String KEY = "foo";
+
+
+  TestInputTopic<String, String> inLeft;
+  TestInputTopic<String, String> inRight;
+  TestOutputTopic<String, String> out;
+
+
   void sendLeftAt(String value, int second)
   {
-    TestRecord<String, String> record = new TestRecord<>("foo", value, T.plusSeconds(second));
+    TestRecord<String, String> record = new TestRecord<>(KEY, value, Instant.ofEpochSecond(second));
     log.info(
-        "Sending   LEFT  at {}: {}",
-        LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
+        "Sending  LEFT  @ {}: {}={}",
+        record.getRecordTime().toEpochMilli(),
+        record.key(),
         record.value());
     inLeft.pipeInput(record);
   }
 
   void sendRightAt(String value, int second)
   {
-    TestRecord<String, String> record = new TestRecord<>("foo", value, T.plusSeconds(second));
+    TestRecord<String, String> record = new TestRecord<>(KEY, value, Instant.ofEpochSecond(second));
     log.info(
-        "Sending   RIGHT at {}: {}",
-        LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
+        "Sending  RIGHT @ {}: {}={}",
+        record.getRecordTime().toEpochMilli(),
+        record.key(),
         record.value());
     inRight.pipeInput(record);
   }
 
-  TestRecord<String, String> recordOf(String value, int second)
-  {
-    return new TestRecord<>("foo", value, T.plusSeconds(second));
-  }
-
   void assertThatOutcomeIs(String... expected)
   {
     assertThat(outcome()).containsExactly(expected);
@@ -131,25 +141,14 @@ public class JoinTopologyTest
 
   Stream<String> outcome()
   {
-    return outJoined
+    return out
         .readRecordsToList()
         .stream()
         .peek(record -> log.info(
-            "Receiving join for {}: {}",
-            LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
+            "Received join  @ {}: {}={}",
+            record.getRecordTime().toEpochMilli(),
+            record.key(),
             record.value()))
         .map(record -> record.value());
   }
-
-
-  static final String LEFT = "TEST-LEFT";
-  static final String RIGHT = "TEST-RIGHT";
-  static final String JOINED = "TEST-JOINED";
-
-  static final Instant T = Instant.now().with(ChronoField.MILLI_OF_SECOND, 0);
-
-
-  TestInputTopic<String, String> inLeft;
-  TestInputTopic<String, String> inRight;
-  TestOutputTopic<String, String> outJoined;
 }