Renaming for demonstration of an aggregation -- MOVE
authorKai Moritz <kai@juplo.de>
Mon, 8 Jul 2024 07:23:15 +0000 (09:23 +0200)
committerKai Moritz <kai@juplo.de>
Mon, 8 Jul 2024 10:40:24 +0000 (12:40 +0200)
src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/counter/StreamStreamJoinTopologyTest.java [deleted file]

diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java
new file mode 100644 (file)
index 0000000..f34dfbe
--- /dev/null
@@ -0,0 +1,155 @@
+package de.juplo.kafka.wordcount.counter;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.test.TestRecord;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.temporal.ChronoField;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+@Slf4j
+public class StreamStreamJoinTopologyTest
+{
+  @Test
+  public void test()
+  {
+    StreamsBuilder builder = new StreamsBuilder();
+
+    KStream<String, String> left = builder.stream(LEFT);
+    KStream<String, String> right = builder.stream(RIGHT);
+
+    left
+        .join(
+            right,
+            (valueLeft, valueRight) -> valueLeft + "-" + valueRight,
+            JoinWindows.ofTimeDifferenceAndGrace(
+                Duration.ofSeconds(10),
+                Duration.ofSeconds(5)))
+        .to(JOINED);
+
+    Topology topology = builder.build();
+
+    Properties properties = new Properties();
+    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+
+    TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
+
+    inLeft = testDriver.createInputTopic(
+        LEFT,
+        Serdes.String().serializer(),
+        Serdes.String().serializer());
+    inRight = testDriver.createInputTopic(
+        RIGHT,
+        Serdes.String().serializer(),
+        Serdes.String().serializer());
+    outJoined = testDriver.createOutputTopic(
+        JOINED,
+        Serdes.String().deserializer(),
+        Serdes.String().deserializer());
+
+
+    sendLeftAt("A", 3);
+    assertThatOutcomeIs();
+
+    sendRightAt("a", 4);
+    assertThatOutcomeIs("A-a");
+
+    sendLeftAt("B", 5);
+    assertThatOutcomeIs("B-a");
+
+    sendRightAt("b", 6);
+    assertThatOutcomeIs("A-b", "B-b");
+
+    sendLeftAt("C", 9);
+    assertThatOutcomeIs("C-a", "C-b");
+
+    sendRightAt("c", 10);
+    assertThatOutcomeIs("A-c", "B-c", "C-c");
+
+    sendRightAt("d", 14);
+    assertThatOutcomeIs("B-d", "C-d"); // !
+
+    sendLeftAt("D", 15);
+    assertThatOutcomeIs("D-b", "D-c", "D-d");
+
+    sendLeftAt("E", 40);
+    assertThatOutcomeIs();
+
+    sendLeftAt("F", 60);
+    assertThatOutcomeIs();
+
+    sendRightAt("f", 80);
+    assertThatOutcomeIs();
+
+    sendLeftAt("G", 100);
+    assertThatOutcomeIs();
+  }
+
+
+  void sendLeftAt(String value, int second)
+  {
+    TestRecord<String, String> record = new TestRecord<>("foo", value, T.plusSeconds(second));
+    log.info(
+        "Sending   LEFT  at {}: {}",
+        LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
+        record.value());
+    inLeft.pipeInput(record);
+  }
+
+  void sendRightAt(String value, int second)
+  {
+    TestRecord<String, String> record = new TestRecord<>("foo", value, T.plusSeconds(second));
+    log.info(
+        "Sending   RIGHT at {}: {}",
+        LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
+        record.value());
+    inRight.pipeInput(record);
+  }
+
+  TestRecord<String, String> recordOf(String value, int second)
+  {
+    return new TestRecord<>("foo", value, T.plusSeconds(second));
+  }
+
+  void assertThatOutcomeIs(String... expected)
+  {
+    assertThat(outcome()).containsExactly(expected);
+  }
+
+  Stream<String> outcome()
+  {
+    return outJoined
+        .readRecordsToList()
+        .stream()
+        .peek(record -> log.info(
+            "Receiving join for {}: {}",
+            LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
+            record.value()))
+        .map(record -> record.value());
+  }
+
+
+  static final String LEFT = "TEST-LEFT";
+  static final String RIGHT = "TEST-RIGHT";
+  static final String JOINED = "TEST-JOINED";
+
+  static final Instant T = Instant.now().with(ChronoField.MILLI_OF_SECOND, 0);
+
+
+  TestInputTopic<String, String> inLeft;
+  TestInputTopic<String, String> inRight;
+  TestOutputTopic<String, String> outJoined;
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/StreamStreamJoinTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/StreamStreamJoinTopologyTest.java
deleted file mode 100644 (file)
index f34dfbe..0000000
+++ /dev/null
@@ -1,155 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.JoinWindows;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.test.TestRecord;
-import org.junit.jupiter.api.Test;
-
-import java.time.Duration;
-import java.time.Instant;
-import java.time.LocalTime;
-import java.time.ZoneId;
-import java.time.temporal.ChronoField;
-import java.util.Properties;
-import java.util.stream.Stream;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-
-@Slf4j
-public class StreamStreamJoinTopologyTest
-{
-  @Test
-  public void test()
-  {
-    StreamsBuilder builder = new StreamsBuilder();
-
-    KStream<String, String> left = builder.stream(LEFT);
-    KStream<String, String> right = builder.stream(RIGHT);
-
-    left
-        .join(
-            right,
-            (valueLeft, valueRight) -> valueLeft + "-" + valueRight,
-            JoinWindows.ofTimeDifferenceAndGrace(
-                Duration.ofSeconds(10),
-                Duration.ofSeconds(5)))
-        .to(JOINED);
-
-    Topology topology = builder.build();
-
-    Properties properties = new Properties();
-    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
-    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
-
-    TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
-
-    inLeft = testDriver.createInputTopic(
-        LEFT,
-        Serdes.String().serializer(),
-        Serdes.String().serializer());
-    inRight = testDriver.createInputTopic(
-        RIGHT,
-        Serdes.String().serializer(),
-        Serdes.String().serializer());
-    outJoined = testDriver.createOutputTopic(
-        JOINED,
-        Serdes.String().deserializer(),
-        Serdes.String().deserializer());
-
-
-    sendLeftAt("A", 3);
-    assertThatOutcomeIs();
-
-    sendRightAt("a", 4);
-    assertThatOutcomeIs("A-a");
-
-    sendLeftAt("B", 5);
-    assertThatOutcomeIs("B-a");
-
-    sendRightAt("b", 6);
-    assertThatOutcomeIs("A-b", "B-b");
-
-    sendLeftAt("C", 9);
-    assertThatOutcomeIs("C-a", "C-b");
-
-    sendRightAt("c", 10);
-    assertThatOutcomeIs("A-c", "B-c", "C-c");
-
-    sendRightAt("d", 14);
-    assertThatOutcomeIs("B-d", "C-d"); // !
-
-    sendLeftAt("D", 15);
-    assertThatOutcomeIs("D-b", "D-c", "D-d");
-
-    sendLeftAt("E", 40);
-    assertThatOutcomeIs();
-
-    sendLeftAt("F", 60);
-    assertThatOutcomeIs();
-
-    sendRightAt("f", 80);
-    assertThatOutcomeIs();
-
-    sendLeftAt("G", 100);
-    assertThatOutcomeIs();
-  }
-
-
-  void sendLeftAt(String value, int second)
-  {
-    TestRecord<String, String> record = new TestRecord<>("foo", value, T.plusSeconds(second));
-    log.info(
-        "Sending   LEFT  at {}: {}",
-        LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
-        record.value());
-    inLeft.pipeInput(record);
-  }
-
-  void sendRightAt(String value, int second)
-  {
-    TestRecord<String, String> record = new TestRecord<>("foo", value, T.plusSeconds(second));
-    log.info(
-        "Sending   RIGHT at {}: {}",
-        LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
-        record.value());
-    inRight.pipeInput(record);
-  }
-
-  TestRecord<String, String> recordOf(String value, int second)
-  {
-    return new TestRecord<>("foo", value, T.plusSeconds(second));
-  }
-
-  void assertThatOutcomeIs(String... expected)
-  {
-    assertThat(outcome()).containsExactly(expected);
-  }
-
-  Stream<String> outcome()
-  {
-    return outJoined
-        .readRecordsToList()
-        .stream()
-        .peek(record -> log.info(
-            "Receiving join for {}: {}",
-            LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
-            record.value()))
-        .map(record -> record.value());
-  }
-
-
-  static final String LEFT = "TEST-LEFT";
-  static final String RIGHT = "TEST-RIGHT";
-  static final String JOINED = "TEST-JOINED";
-
-  static final Instant T = Instant.now().with(ChronoField.MILLI_OF_SECOND, 0);
-
-
-  TestInputTopic<String, String> inLeft;
-  TestInputTopic<String, String> inRight;
-  TestOutputTopic<String, String> outJoined;
-}