Remodeled example into a demonstration of an aggregation with a hopping time window
authorKai Moritz <kai@juplo.de>
Mon, 8 Jul 2024 10:35:19 +0000 (12:35 +0200)
committerKai Moritz <kai@juplo.de>
Mon, 8 Jul 2024 12:11:11 +0000 (14:11 +0200)
src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java

index c6bbb4e..94935be 100644 (file)
@@ -3,9 +3,8 @@ package de.juplo.kafka.wordcount.counter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.EmitStrategy;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.*;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.test.TestRecord;
 import org.junit.jupiter.api.Test;
 
@@ -14,6 +13,8 @@ import java.time.Instant;
 import java.time.LocalTime;
 import java.time.ZoneId;
 import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.stream.Stream;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -22,6 +23,9 @@ import static org.assertj.core.api.Assertions.assertThat;
 @Slf4j
 public class AggregationTopologyTest
 {
+  static final TimeWindows WINDOWS = TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(10));
+
+
   @Test
   public void test()
   {
@@ -32,8 +36,10 @@ public class AggregationTopologyTest
     input
         .peek((k,v) -> log.info("peek-0 -- {} = {}", k, v))
         .groupByKey()
+        .windowedBy(WINDOWS)
+        .emitStrategy(EmitStrategy.onWindowUpdate())
         .reduce((aggregate, value) -> aggregate + "-" + value)
-        .toStream()
+        .toStream((k,v) -> k.toString())
         .peek((k,v) -> log.info("peek-1 -- {} = {}", k, v))
         .to(OUTPUT);
 
@@ -58,40 +64,40 @@ public class AggregationTopologyTest
 
 
     sendAt("A", 63);
-    assertThatOutcomeIs("A");
+    assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A"));
 
     sendAt("B", 64);
-    assertThatOutcomeIs("A-B");
+    assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B"));
 
     sendAt("C", 65);
-    assertThatOutcomeIs("A-B-C");
+    assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C"));
 
     sendAt("D", 66);
-    assertThatOutcomeIs("A-B-C-D");
+    assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C-D"));
 
     sendAt("E", 69);
-    assertThatOutcomeIs("A-B-C-D-E");
+    assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C-D-E"));
 
     sendAt("F", 70);
-    assertThatOutcomeIs("A-B-C-D-E-F");
+    assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F"));
 
     sendAt("G", 74);
-    assertThatOutcomeIs("A-B-C-D-E-F-G");
+    assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F-G"));
 
     sendAt("H", 75);
-    assertThatOutcomeIs("A-B-C-D-E-F-G-H");
+    assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F-G-H"));
 
     sendAt("I", 100);
-    assertThatOutcomeIs("A-B-C-D-E-F-G-H-I");
+    assertThatOutcomeIs(KeyValue.pair(windowFor(100), "I"));
 
     sendAt("J", 120);
-    assertThatOutcomeIs("A-B-C-D-E-F-G-H-I-J");
+    assertThatOutcomeIs(KeyValue.pair(windowFor(120), "J"));
 
     sendAt("K", 140);
-    assertThatOutcomeIs("A-B-C-D-E-F-G-H-I-J-K");
+    assertThatOutcomeIs(KeyValue.pair(windowFor(140), "K"));
 
     sendAt("L", 160);
-    assertThatOutcomeIs("A-B-C-D-E-F-G-H-I-J-K-L");
+    assertThatOutcomeIs(KeyValue.pair(windowFor(160), "L"));
   }
 
 
@@ -116,12 +122,12 @@ public class AggregationTopologyTest
     in.pipeInput(record);
   }
 
-  void assertThatOutcomeIs(String... expected)
+  void assertThatOutcomeIs(KeyValue<Windowed<String>, String>... expected)
   {
     assertThat(outcome()).containsExactly(expected);
   }
 
-  Stream<String> outcome()
+  Stream<KeyValue<Windowed<String>, String>> outcome()
   {
     return out
         .readRecordsToList()
@@ -132,6 +138,34 @@ public class AggregationTopologyTest
             LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
             record.key(),
             record.value()))
-        .map(record -> record.value());
+        .map(record -> KeyValue.pair(parse(record.key()), record.value()));
+  }
+
+
+  static final Pattern PATTERN = Pattern.compile("^\\[([^@]*)@(\\d+)/(\\d+)\\]$");
+
+  Windowed<String> parse(String serialized)
+  {
+    Matcher matcher = PATTERN.matcher(serialized);
+
+    if (!matcher.matches())
+    {
+      throw new IllegalArgumentException(serialized + "does not match " + PATTERN.pattern());
+    }
+
+    String key = matcher.group(1);
+    String start = matcher.group(2);
+    String end = matcher.group(3);
+
+    Window window = new TimeWindow(Long.parseLong(start), Long.parseLong(end));
+
+    return new Windowed<>(key, window);
+  }
+
+  Windowed<String> windowFor(int second)
+  {
+    Instant time = Instant.ofEpochSecond(second);
+    long timestamp = time.toEpochMilli();
+    return new Windowed<>(KEY, WINDOWS.windowsFor(timestamp).values().stream().findFirst().get());
   }
 }