Remodeld example into a demonstration of a plain endless aggregation
authorKai Moritz <kai@juplo.de>
Mon, 8 Jul 2024 07:40:54 +0000 (09:40 +0200)
committerKai Moritz <kai@juplo.de>
Mon, 8 Jul 2024 10:42:28 +0000 (12:42 +0200)
src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java

index fd0ae85..a72d38f 100644 (file)
@@ -3,8 +3,9 @@ package de.juplo.kafka.wordcount.counter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.EmitStrategy;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.test.TestRecord;
 import org.junit.jupiter.api.Test;
 
@@ -12,7 +13,6 @@ import java.time.Duration;
 import java.time.Instant;
 import java.time.LocalTime;
 import java.time.ZoneId;
-import java.time.temporal.ChronoField;
 import java.util.Properties;
 import java.util.stream.Stream;
 
@@ -22,106 +22,101 @@ import static org.assertj.core.api.Assertions.assertThat;
 @Slf4j
 public class AggregationTopologyTest
 {
+  static final Instant T0 = Instant.ofEpochSecond(60);
+
+
   @Test
   public void test()
   {
     StreamsBuilder builder = new StreamsBuilder();
 
-    KStream<String, String> left = builder.stream(LEFT);
-    KStream<String, String> right = builder.stream(RIGHT);
+    KStream<String, String> input = builder.stream(INPUT);
 
-    left
-        .join(
-            right,
-            (valueLeft, valueRight) -> valueLeft + "-" + valueRight,
-            JoinWindows.ofTimeDifferenceAndGrace(
-                Duration.ofSeconds(10),
-                Duration.ofSeconds(5)))
-        .to(JOINED);
+    input
+        .peek((k,v) -> log.info("peek-0 -- {} = {}", k, v))
+        .groupByKey()
+        .reduce((aggregate, value) -> aggregate + "-" + value)
+        .toStream()
+        .peek((k,v) -> log.info("peek-1 -- {} = {}", k, v))
+        .to(OUTPUT);
 
     Topology topology = builder.build();
+    log.info("Generated topology: {}", topology.describe());
 
     Properties properties = new Properties();
     properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
     properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+    properties.put(StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, 0l);
 
     TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
 
-    inLeft = testDriver.createInputTopic(
-        LEFT,
-        Serdes.String().serializer(),
-        Serdes.String().serializer());
-    inRight = testDriver.createInputTopic(
-        RIGHT,
+    in = testDriver.createInputTopic(
+        INPUT,
         Serdes.String().serializer(),
         Serdes.String().serializer());
-    outJoined = testDriver.createOutputTopic(
-        JOINED,
+    out = testDriver.createOutputTopic(
+        OUTPUT,
         Serdes.String().deserializer(),
         Serdes.String().deserializer());
 
 
-    sendLeftAt("A", 3);
-    assertThatOutcomeIs();
+    sendAt("A", 3);
+    assertThatOutcomeIs("A");
 
-    sendRightAt("a", 4);
-    assertThatOutcomeIs("A-a");
+    sendAt("B", 4);
+    assertThatOutcomeIs("A-B");
 
-    sendLeftAt("B", 5);
-    assertThatOutcomeIs("B-a");
+    sendAt("C", 5);
+    assertThatOutcomeIs("A-B-C");
 
-    sendRightAt("b", 6);
-    assertThatOutcomeIs("A-b", "B-b");
+    sendAt("D", 6);
+    assertThatOutcomeIs("A-B-C-D");
 
-    sendLeftAt("C", 9);
-    assertThatOutcomeIs("C-a", "C-b");
+    sendAt("E", 9);
+    assertThatOutcomeIs("A-B-C-D-E");
 
-    sendRightAt("c", 10);
-    assertThatOutcomeIs("A-c", "B-c", "C-c");
+    sendAt("F", 10);
+    assertThatOutcomeIs("A-B-C-D-E-F");
 
-    sendRightAt("d", 14);
-    assertThatOutcomeIs("B-d", "C-d"); // !
+    sendAt("G", 14);
+    assertThatOutcomeIs("A-B-C-D-E-F-G");
 
-    sendLeftAt("D", 15);
-    assertThatOutcomeIs("D-b", "D-c", "D-d");
+    sendAt("H", 15);
+    assertThatOutcomeIs("A-B-C-D-E-F-G-H");
 
-    sendLeftAt("E", 40);
-    assertThatOutcomeIs();
+    sendAt("I", 40);
+    assertThatOutcomeIs("A-B-C-D-E-F-G-H-I");
 
-    sendLeftAt("F", 60);
-    assertThatOutcomeIs();
+    sendAt("J", 60);
+    assertThatOutcomeIs("A-B-C-D-E-F-G-H-I-J");
 
-    sendRightAt("f", 80);
-    assertThatOutcomeIs();
+    sendAt("K", 80);
+    assertThatOutcomeIs("A-B-C-D-E-F-G-H-I-J-K");
 
-    sendLeftAt("G", 100);
-    assertThatOutcomeIs();
+    sendAt("L", 100);
+    assertThatOutcomeIs("A-B-C-D-E-F-G-H-I-J-K-L");
   }
 
 
-  void sendLeftAt(String value, int second)
-  {
-    TestRecord<String, String> record = new TestRecord<>("foo", value, T.plusSeconds(second));
-    log.info(
-        "Sending   LEFT  at {}: {}",
-        LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
-        record.value());
-    inLeft.pipeInput(record);
-  }
+  static final String INPUT = "TEST-INPUT";
+  static final String OUTPUT = "TEST-OUTPUT";
+
+  static final String KEY = "foo";
+
 
-  void sendRightAt(String value, int second)
+  TestInputTopic<String, String> in;
+  TestOutputTopic<String, String> out;
+
+
+  void sendAt(String value, int second)
   {
-    TestRecord<String, String> record = new TestRecord<>("foo", value, T.plusSeconds(second));
+    TestRecord<String, String> record = new TestRecord<>(KEY, value, T0.plusSeconds(second));
     log.info(
-        "Sending   RIGHT at {}: {}",
+        "Sending at {} ({}): {}",
+        record.getRecordTime().toEpochMilli(),
         LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
         record.value());
-    inRight.pipeInput(record);
-  }
-
-  TestRecord<String, String> recordOf(String value, int second)
-  {
-    return new TestRecord<>("foo", value, T.plusSeconds(second));
+    in.pipeInput(record);
   }
 
   void assertThatOutcomeIs(String... expected)
@@ -131,25 +126,15 @@ public class AggregationTopologyTest
 
   Stream<String> outcome()
   {
-    return outJoined
+    return out
         .readRecordsToList()
         .stream()
         .peek(record -> log.info(
-            "Receiving join for {}: {}",
+            "Received for {} ({}): {}={}",
+            record.getRecordTime().toEpochMilli(),
             LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
+            record.key(),
             record.value()))
         .map(record -> record.value());
   }
-
-
-  static final String LEFT = "TEST-LEFT";
-  static final String RIGHT = "TEST-RIGHT";
-  static final String JOINED = "TEST-JOINED";
-
-  static final Instant T = Instant.now().with(ChronoField.MILLI_OF_SECOND, 0);
-
-
-  TestInputTopic<String, String> inLeft;
-  TestInputTopic<String, String> inRight;
-  TestOutputTopic<String, String> outJoined;
 }