Simplified demo-code: removed unnecessary keywords and performed DRY
authorKai Moritz <kai@juplo.de>
Fri, 5 Jul 2024 21:06:32 +0000 (23:06 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 5 Jul 2024 21:08:00 +0000 (23:08 +0200)
src/test/java/de/juplo/kafka/wordcount/counter/StreamStreamJoinTopologyTest.java

index 4c26e6c..8d178d5 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka.wordcount.counter;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.*;
 import org.apache.kafka.streams.kstream.JoinWindows;
@@ -9,6 +10,8 @@ import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
 import java.time.Instant;
+import java.time.LocalTime;
+import java.time.ZoneId;
 import java.time.temporal.ChronoField;
 import java.util.Properties;
 import java.util.stream.Stream;
@@ -16,30 +19,21 @@ import java.util.stream.Stream;
 import static org.assertj.core.api.Assertions.assertThat;
 
 
+@Slf4j
 public class StreamStreamJoinTopologyTest
 {
-  static final String LEFT = "TEST-LEFT";
-  static final String RIGHT = "TEST-RIGHT";
-  static final String JOINED = "TEST-JOINED";
-
-  static TestInputTopic<String, String> inLeft;
-  static TestInputTopic<String, String> inRight;
-  static TestOutputTopic<String, String> outJoined;
-
-  static final Instant T = Instant.now().with(ChronoField.MILLI_OF_SECOND, 0);
-
-  static final TestRecord<String, String> A = new TestRecord<>("foo", "A", T.minusSeconds(3));
-  static final TestRecord<String, String> a = new TestRecord<>("foo", "a", T.minusSeconds(4));
-  static final TestRecord<String, String> B = new TestRecord<>("foo", "B", T.minusSeconds(5));
-  static final TestRecord<String, String> b = new TestRecord<>("foo", "b", T.minusSeconds(6));
-  static final TestRecord<String, String> C = new TestRecord<>("foo", "C", T.minusSeconds(9));
-  static final TestRecord<String, String> c = new TestRecord<>("foo", "c", T.minusSeconds(10));
-  static final TestRecord<String, String> D = new TestRecord<>("foo", "D", T.minusSeconds(15));
-  static final TestRecord<String, String> d = new TestRecord<>("foo", "d", T.minusSeconds(14));
-  static final TestRecord<String, String> E = new TestRecord<>("foo", "E", T.minusSeconds(40));
-  static final TestRecord<String, String> F = new TestRecord<>("foo", "F", T.minusSeconds(60));
-  static final TestRecord<String, String> f = new TestRecord<>("foo", "f", T.minusSeconds(80));
-  static final TestRecord<String, String> G = new TestRecord<>("foo", "G", T.minusSeconds(100));
+  TestRecord<String, String> A = recordOf("A", 3);
+  TestRecord<String, String> a = recordOf("a", 4);
+  TestRecord<String, String> B = recordOf("B", 5);
+  TestRecord<String, String> b = recordOf("b", 6);
+  TestRecord<String, String> C = recordOf("C", 9);
+  TestRecord<String, String> c = recordOf("c", 10);
+  TestRecord<String, String> D = recordOf("D", 15);
+  TestRecord<String, String> d = recordOf("d", 14);
+  TestRecord<String, String> E = recordOf("E", 40);
+  TestRecord<String, String> F = recordOf("F", 60);
+  TestRecord<String, String> f = recordOf("f", 80);
+  TestRecord<String, String> G = recordOf("G", 100);
 
 
   @Test
@@ -115,16 +109,39 @@ public class StreamStreamJoinTopologyTest
     assertThat(noResult());
   }
 
+
+
   boolean noResult()
   {
     return outJoined.isEmpty();
   }
 
+  TestRecord<String, String> recordOf(String value, int second)
+  {
+    return new TestRecord<>("foo", value, T.minusSeconds(second));
+  }
+
   Stream<String> outcome()
   {
     return outJoined
         .readRecordsToList()
         .stream()
+        .peek(record -> log.info(
+            "Received join-outcome: {} for time {}",
+            record.value(),
+            LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault())))
         .map(record -> record.value());
   }
+
+
+  static final String LEFT = "TEST-LEFT";
+  static final String RIGHT = "TEST-RIGHT";
+  static final String JOINED = "TEST-JOINED";
+
+  static final Instant T = Instant.now().with(ChronoField.MILLI_OF_SECOND, 0);
+
+
+  TestInputTopic<String, String> inLeft;
+  TestInputTopic<String, String> inRight;
+  TestOutputTopic<String, String> outJoined;
 }