Renamed test inot `AggregateWindowedSlidingTest` -- MOVE
authorKai Moritz <kai@juplo.de>
Wed, 17 Jul 2024 10:33:40 +0000 (12:33 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 17 Jul 2024 10:43:07 +0000 (12:43 +0200)
src/test/java/de/juplo/kafka/streams/aggregate/windowed/sliding/AggregateWindowedSlidingTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java [deleted file]

diff --git a/src/test/java/de/juplo/kafka/streams/aggregate/windowed/sliding/AggregateWindowedSlidingTest.java b/src/test/java/de/juplo/kafka/streams/aggregate/windowed/sliding/AggregateWindowedSlidingTest.java
new file mode 100644 (file)
index 0000000..95e082d
--- /dev/null
@@ -0,0 +1,240 @@
+package de.juplo.kafka.wordcount.counter;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.kstream.*;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.test.TestRecord;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+@Slf4j
+public class AggregationTopologyTest
+{
+  static final String STORE_NAME = "aggregate-store";
+  static final Duration WINDOW_SIZE = Duration.ofSeconds(10);
+  static final SlidingWindows WINDOWS = SlidingWindows.ofTimeDifferenceWithNoGrace(WINDOW_SIZE);
+
+
+  @Test
+  public void test()
+  {
+    StreamsBuilder builder = new StreamsBuilder();
+
+    KStream<String, String> input = builder.stream(INPUT);
+
+    input
+        .groupByKey()
+        .windowedBy(WINDOWS)
+        .reduce(
+            (aggregate, value) -> aggregate + "-" + value,
+            Materialized.as(STORE_NAME))
+        .toStream((k,v) -> k.toString())
+        .to(OUTPUT);
+
+    Topology topology = builder.build();
+    log.info("Generated topology: {}", topology.describe());
+
+    Properties properties = new Properties();
+    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+
+    TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
+
+    in = testDriver.createInputTopic(
+        INPUT,
+        Serdes.String().serializer(),
+        Serdes.String().serializer());
+    out = testDriver.createOutputTopic(
+        OUTPUT,
+        Serdes.String().deserializer(),
+        Serdes.String().deserializer());
+
+
+    sendAt("A", 63);
+    assertThatOutcomeIs(
+        KeyValue.pair(windowFor(53000), "A"));
+
+    logStateStore(testDriver);
+
+    sendAt("B", 64);
+    assertThatOutcomeIs(
+        KeyValue.pair(windowFor(63001), "B"),
+        KeyValue.pair(windowFor(54000), "A-B"));
+
+    logStateStore(testDriver);
+
+    sendAt("C", 65);
+    assertThatOutcomeIs(
+        KeyValue.pair(windowFor(63001), "B-C"),
+        KeyValue.pair(windowFor(64001), "C"),
+        KeyValue.pair(windowFor(55000), "A-B-C"));
+
+    logStateStore(testDriver);
+
+    sendAt("D", 66);
+    assertThatOutcomeIs(
+        KeyValue.pair(windowFor(64001), "C-D"),
+        KeyValue.pair(windowFor(63001), "B-C-D"),
+        KeyValue.pair(windowFor(65001), "D"),
+        KeyValue.pair(windowFor(56000), "A-B-C-D"));
+
+    logStateStore(testDriver);
+
+    sendAt("E", 69);
+    assertThatOutcomeIs(
+        KeyValue.pair(windowFor(65001), "D-E"),
+        KeyValue.pair(windowFor(64001), "C-D-E"),
+        KeyValue.pair(windowFor(63001), "B-C-D-E"),
+        KeyValue.pair(windowFor(66001), "E"),
+        KeyValue.pair(windowFor(59000), "A-B-C-D-E"));
+
+    logStateStore(testDriver);
+
+    sendAt("F", 70);
+    assertThatOutcomeIs(
+        KeyValue.pair(windowFor(66001), "E-F"),
+        KeyValue.pair(windowFor(65001), "D-E-F"),
+        KeyValue.pair(windowFor(64001), "C-D-E-F"),
+        KeyValue.pair(windowFor(63001), "B-C-D-E-F"),
+        KeyValue.pair(windowFor(69001), "F"),
+        KeyValue.pair(windowFor(60000), "A-B-C-D-E-F"));
+
+    logStateStore(testDriver);
+
+    sendAt("G", 74);
+    assertThatOutcomeIs(
+        KeyValue.pair(windowFor(69001), "F-G"),
+        KeyValue.pair(windowFor(66001), "E-F-G"),
+        KeyValue.pair(windowFor(65001), "D-E-F-G"),
+        KeyValue.pair(windowFor(64001), "C-D-E-F-G"),
+        KeyValue.pair(windowFor(70001), "G"),
+        KeyValue.pair(windowFor(64000), "B-C-D-E-F-G"));
+
+    logStateStore(testDriver);
+
+    sendAt("H", 75);
+    assertThatOutcomeIs(
+        KeyValue.pair(windowFor(70001), "G-H"),
+        KeyValue.pair(windowFor(69001), "F-G-H"),
+        KeyValue.pair(windowFor(66001), "E-F-G-H"),
+        KeyValue.pair(windowFor(65001), "D-E-F-G-H"),
+        KeyValue.pair(windowFor(74001), "H"),
+        KeyValue.pair(windowFor(65000), "C-D-E-F-G-H"));
+
+    logStateStore(testDriver);
+
+    sendAt("I", 100);
+    assertThatOutcomeIs(
+        KeyValue.pair(windowFor(90000), "I"));
+
+    logStateStore(testDriver);
+
+    sendAt("J", 120);
+    assertThatOutcomeIs(
+        KeyValue.pair(windowFor(110000), "J"));
+
+    logStateStore(testDriver);
+
+    sendAt("K", 140);
+    assertThatOutcomeIs(
+        KeyValue.pair(windowFor(130000), "K"));
+
+    logStateStore(testDriver);
+
+    sendAt("L", 160);
+    assertThatOutcomeIs(
+        KeyValue.pair(windowFor(150000), "L"));
+
+    logStateStore(testDriver);
+  }
+
+
+  static final String INPUT = "TEST-INPUT";
+  static final String OUTPUT = "TEST-OUTPUT";
+
+  static final String KEY = "foo";
+
+
+  TestInputTopic<String, String> in;
+  TestOutputTopic<String, String> out;
+
+
+  void sendAt(String value, int second)
+  {
+    TestRecord<String, String> record = new TestRecord<>(KEY, value, Instant.ofEpochSecond(second));
+    log.info(
+        "Sending  @ {}: {} = {}",
+        record.getRecordTime().toEpochMilli(),
+        record.key(),
+        record.value());
+    in.pipeInput(record);
+  }
+
+  void assertThatOutcomeIs(KeyValue<Windowed<String>, String>... expected)
+  {
+    assertThat(outcome()).containsExactly(expected);
+  }
+
+  Stream<KeyValue<Windowed<String>, String>> outcome()
+  {
+    return out
+        .readRecordsToList()
+        .stream()
+        .peek(record -> log.info(
+            "Received @ {}: {} = {}",
+            record.getRecordTime().toEpochMilli(),
+            record.key(),
+            record.value()))
+        .map(record -> KeyValue.pair(parse(record.key()), record.value()));
+  }
+
+  void logStateStore(TopologyTestDriver testDriver)
+  {
+    KeyValueIterator i = testDriver.getTimestampedWindowStore(STORE_NAME).all();
+    while(i.hasNext())
+    {
+      Object o = i.next();
+      log.info("{}", o);
+    }
+  }
+
+  static final Pattern PATTERN = Pattern.compile("^\\[([^@]*)@(\\d+)/(\\d+)\\]$");
+
+  Windowed<String> parse(String serialized)
+  {
+    Matcher matcher = PATTERN.matcher(serialized);
+
+    if (!matcher.matches())
+    {
+      throw new IllegalArgumentException(serialized + "does not match " + PATTERN.pattern());
+    }
+
+    String key = matcher.group(1);
+    String start = matcher.group(2);
+    String end = matcher.group(3);
+
+    Window window = new TimeWindow(Long.parseLong(start), Long.parseLong(end));
+
+    return new Windowed<>(key, window);
+  }
+
+  Windowed<String> windowFor(long milli)
+  {
+    Instant startTime = Instant.ofEpochMilli(milli);
+    Instant endTime = startTime.plus(WINDOW_SIZE);
+    TimeWindow window = new TimeWindow(startTime.toEpochMilli(), endTime.toEpochMilli());
+    return new Windowed<>(KEY, window);
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java
deleted file mode 100644 (file)
index 95e082d..0000000
+++ /dev/null
@@ -1,240 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.*;
-import org.apache.kafka.streams.kstream.internals.TimeWindow;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.test.TestRecord;
-import org.junit.jupiter.api.Test;
-
-import java.time.Duration;
-import java.time.Instant;
-import java.util.Properties;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Stream;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-
-@Slf4j
-public class AggregationTopologyTest
-{
-  static final String STORE_NAME = "aggregate-store";
-  static final Duration WINDOW_SIZE = Duration.ofSeconds(10);
-  static final SlidingWindows WINDOWS = SlidingWindows.ofTimeDifferenceWithNoGrace(WINDOW_SIZE);
-
-
-  @Test
-  public void test()
-  {
-    StreamsBuilder builder = new StreamsBuilder();
-
-    KStream<String, String> input = builder.stream(INPUT);
-
-    input
-        .groupByKey()
-        .windowedBy(WINDOWS)
-        .reduce(
-            (aggregate, value) -> aggregate + "-" + value,
-            Materialized.as(STORE_NAME))
-        .toStream((k,v) -> k.toString())
-        .to(OUTPUT);
-
-    Topology topology = builder.build();
-    log.info("Generated topology: {}", topology.describe());
-
-    Properties properties = new Properties();
-    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
-    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
-
-    TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
-
-    in = testDriver.createInputTopic(
-        INPUT,
-        Serdes.String().serializer(),
-        Serdes.String().serializer());
-    out = testDriver.createOutputTopic(
-        OUTPUT,
-        Serdes.String().deserializer(),
-        Serdes.String().deserializer());
-
-
-    sendAt("A", 63);
-    assertThatOutcomeIs(
-        KeyValue.pair(windowFor(53000), "A"));
-
-    logStateStore(testDriver);
-
-    sendAt("B", 64);
-    assertThatOutcomeIs(
-        KeyValue.pair(windowFor(63001), "B"),
-        KeyValue.pair(windowFor(54000), "A-B"));
-
-    logStateStore(testDriver);
-
-    sendAt("C", 65);
-    assertThatOutcomeIs(
-        KeyValue.pair(windowFor(63001), "B-C"),
-        KeyValue.pair(windowFor(64001), "C"),
-        KeyValue.pair(windowFor(55000), "A-B-C"));
-
-    logStateStore(testDriver);
-
-    sendAt("D", 66);
-    assertThatOutcomeIs(
-        KeyValue.pair(windowFor(64001), "C-D"),
-        KeyValue.pair(windowFor(63001), "B-C-D"),
-        KeyValue.pair(windowFor(65001), "D"),
-        KeyValue.pair(windowFor(56000), "A-B-C-D"));
-
-    logStateStore(testDriver);
-
-    sendAt("E", 69);
-    assertThatOutcomeIs(
-        KeyValue.pair(windowFor(65001), "D-E"),
-        KeyValue.pair(windowFor(64001), "C-D-E"),
-        KeyValue.pair(windowFor(63001), "B-C-D-E"),
-        KeyValue.pair(windowFor(66001), "E"),
-        KeyValue.pair(windowFor(59000), "A-B-C-D-E"));
-
-    logStateStore(testDriver);
-
-    sendAt("F", 70);
-    assertThatOutcomeIs(
-        KeyValue.pair(windowFor(66001), "E-F"),
-        KeyValue.pair(windowFor(65001), "D-E-F"),
-        KeyValue.pair(windowFor(64001), "C-D-E-F"),
-        KeyValue.pair(windowFor(63001), "B-C-D-E-F"),
-        KeyValue.pair(windowFor(69001), "F"),
-        KeyValue.pair(windowFor(60000), "A-B-C-D-E-F"));
-
-    logStateStore(testDriver);
-
-    sendAt("G", 74);
-    assertThatOutcomeIs(
-        KeyValue.pair(windowFor(69001), "F-G"),
-        KeyValue.pair(windowFor(66001), "E-F-G"),
-        KeyValue.pair(windowFor(65001), "D-E-F-G"),
-        KeyValue.pair(windowFor(64001), "C-D-E-F-G"),
-        KeyValue.pair(windowFor(70001), "G"),
-        KeyValue.pair(windowFor(64000), "B-C-D-E-F-G"));
-
-    logStateStore(testDriver);
-
-    sendAt("H", 75);
-    assertThatOutcomeIs(
-        KeyValue.pair(windowFor(70001), "G-H"),
-        KeyValue.pair(windowFor(69001), "F-G-H"),
-        KeyValue.pair(windowFor(66001), "E-F-G-H"),
-        KeyValue.pair(windowFor(65001), "D-E-F-G-H"),
-        KeyValue.pair(windowFor(74001), "H"),
-        KeyValue.pair(windowFor(65000), "C-D-E-F-G-H"));
-
-    logStateStore(testDriver);
-
-    sendAt("I", 100);
-    assertThatOutcomeIs(
-        KeyValue.pair(windowFor(90000), "I"));
-
-    logStateStore(testDriver);
-
-    sendAt("J", 120);
-    assertThatOutcomeIs(
-        KeyValue.pair(windowFor(110000), "J"));
-
-    logStateStore(testDriver);
-
-    sendAt("K", 140);
-    assertThatOutcomeIs(
-        KeyValue.pair(windowFor(130000), "K"));
-
-    logStateStore(testDriver);
-
-    sendAt("L", 160);
-    assertThatOutcomeIs(
-        KeyValue.pair(windowFor(150000), "L"));
-
-    logStateStore(testDriver);
-  }
-
-
-  static final String INPUT = "TEST-INPUT";
-  static final String OUTPUT = "TEST-OUTPUT";
-
-  static final String KEY = "foo";
-
-
-  TestInputTopic<String, String> in;
-  TestOutputTopic<String, String> out;
-
-
-  void sendAt(String value, int second)
-  {
-    TestRecord<String, String> record = new TestRecord<>(KEY, value, Instant.ofEpochSecond(second));
-    log.info(
-        "Sending  @ {}: {} = {}",
-        record.getRecordTime().toEpochMilli(),
-        record.key(),
-        record.value());
-    in.pipeInput(record);
-  }
-
-  void assertThatOutcomeIs(KeyValue<Windowed<String>, String>... expected)
-  {
-    assertThat(outcome()).containsExactly(expected);
-  }
-
-  Stream<KeyValue<Windowed<String>, String>> outcome()
-  {
-    return out
-        .readRecordsToList()
-        .stream()
-        .peek(record -> log.info(
-            "Received @ {}: {} = {}",
-            record.getRecordTime().toEpochMilli(),
-            record.key(),
-            record.value()))
-        .map(record -> KeyValue.pair(parse(record.key()), record.value()));
-  }
-
-  void logStateStore(TopologyTestDriver testDriver)
-  {
-    KeyValueIterator i = testDriver.getTimestampedWindowStore(STORE_NAME).all();
-    while(i.hasNext())
-    {
-      Object o = i.next();
-      log.info("{}", o);
-    }
-  }
-
-  static final Pattern PATTERN = Pattern.compile("^\\[([^@]*)@(\\d+)/(\\d+)\\]$");
-
-  Windowed<String> parse(String serialized)
-  {
-    Matcher matcher = PATTERN.matcher(serialized);
-
-    if (!matcher.matches())
-    {
-      throw new IllegalArgumentException(serialized + "does not match " + PATTERN.pattern());
-    }
-
-    String key = matcher.group(1);
-    String start = matcher.group(2);
-    String end = matcher.group(3);
-
-    Window window = new TimeWindow(Long.parseLong(start), Long.parseLong(end));
-
-    return new Windowed<>(key, window);
-  }
-
-  Windowed<String> windowFor(long milli)
-  {
-    Instant startTime = Instant.ofEpochMilli(milli);
-    Instant endTime = startTime.plus(WINDOW_SIZE);
-    TimeWindow window = new TimeWindow(startTime.toEpochMilli(), endTime.toEpochMilli());
-    return new Windowed<>(KEY, window);
-  }
-}