--- /dev/null
+package de.juplo.kafka.streams.aggregate.;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.test.TestRecord;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+@Slf4j
+public class AggregationTopologyTest
+{
+ static final Duration INACTIVITY_GAP = Duration.ofSeconds(2);
+ static final SessionWindows WINDOWS = SessionWindows.ofInactivityGapWithNoGrace(INACTIVITY_GAP);
+
+
+ @Test
+ public void test()
+ {
+ StreamsBuilder builder = new StreamsBuilder();
+
+ KStream<String, String> input = builder.stream(INPUT);
+
+ input
+ .groupByKey()
+ .windowedBy(WINDOWS)
+ .reduce((aggregate, value) -> aggregate + "-" + value)
+ .toStream((k,v) -> k.toString())
+ .to(OUTPUT);
+
+ Topology topology = builder.build();
+ log.info("Generated topology: {}", topology.describe());
+
+ Properties properties = new Properties();
+ properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+ properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+
+ TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
+
+ in = testDriver.createInputTopic(
+ INPUT,
+ Serdes.String().serializer(),
+ Serdes.String().serializer());
+ out = testDriver.createOutputTopic(
+ OUTPUT,
+ Serdes.String().deserializer(),
+ Serdes.String().deserializer());
+
+
+ sendAt("A", 63);
+ assertThatOutcomeIs(
+ KeyValue.pair(windowFor(63, 63), "A"));
+
+ sendAt("B", 64);
+ assertThatOutcomeIs(
+ KeyValue.pair(windowFor(63, 63), null),
+ KeyValue.pair(windowFor(63, 64), "A-B"));
+
+ sendAt("C", 65);
+ assertThatOutcomeIs(
+ KeyValue.pair(windowFor(63, 64), null),
+ KeyValue.pair(windowFor(63, 65), "A-B-C"));
+
+ sendAt("D", 66);
+ assertThatOutcomeIs(
+ KeyValue.pair(windowFor(63, 65), null),
+ KeyValue.pair(windowFor(63, 66), "A-B-C-D"));
+
+ sendAt("E", 69);
+ assertThatOutcomeIs(
+ KeyValue.pair(windowFor(69, 69), "E"));
+
+ sendAt("F", 70);
+ assertThatOutcomeIs(
+ KeyValue.pair(windowFor(69, 69), null),
+ KeyValue.pair(windowFor(69, 70), "E-F"));
+
+ sendAt("G", 74);
+ assertThatOutcomeIs(
+ KeyValue.pair(windowFor(74, 74), "G"));
+
+ sendAt("H", 75);
+ assertThatOutcomeIs(
+ KeyValue.pair(windowFor(74, 74), null),
+ KeyValue.pair(windowFor(74, 75), "G-H"));
+
+ sendAt("I", 100);
+ assertThatOutcomeIs(
+ KeyValue.pair(windowFor(100, 100), "I"));
+
+ sendAt("J", 120);
+ assertThatOutcomeIs(
+ KeyValue.pair(windowFor(120, 120), "J"));
+
+ sendAt("K", 140);
+ assertThatOutcomeIs(
+ KeyValue.pair(windowFor(140, 140), "K"));
+
+ sendAt("L", 160);
+ assertThatOutcomeIs(
+ KeyValue.pair(windowFor(160, 160), "L"));
+ }
+
+
+ static final String INPUT = "TEST-INPUT";
+ static final String OUTPUT = "TEST-OUTPUT";
+
+ static final String KEY = "foo";
+
+
+ TestInputTopic<String, String> in;
+ TestOutputTopic<String, String> out;
+
+
+ void sendAt(String value, int second)
+ {
+ TestRecord<String, String> record = new TestRecord<>(KEY, value, Instant.ofEpochSecond(second));
+ log.info(
+ "Sending @ {}: {} = {}",
+ record.getRecordTime().toEpochMilli(),
+ record.key(),
+ record.value());
+ in.pipeInput(record);
+ }
+
+ void assertThatOutcomeIs(KeyValue<Windowed<String>, String>... expected)
+ {
+ assertThat(outcome()).containsExactly(expected);
+ }
+
+ Stream<KeyValue<Windowed<String>, String>> outcome()
+ {
+ return out
+ .readRecordsToList()
+ .stream()
+ .peek(record -> log.info(
+ "Received @ {}: {} = {}",
+ record.getRecordTime().toEpochMilli(),
+ record.key(),
+ record.value()))
+ .map(record -> KeyValue.pair(parse(record.key()), record.value()));
+ }
+
+
+ static final Pattern PATTERN = Pattern.compile("^\\[([^@]*)@(\\d+)/(\\d+)\\]$");
+
+ Windowed<String> parse(String serialized)
+ {
+ Matcher matcher = PATTERN.matcher(serialized);
+
+ if (!matcher.matches())
+ {
+ throw new IllegalArgumentException(serialized + "does not match " + PATTERN.pattern());
+ }
+
+ String key = matcher.group(1);
+ String start = matcher.group(2);
+ String end = matcher.group(3);
+
+ Window window = new SessionWindow(Long.parseLong(start), Long.parseLong(end));
+
+ return new Windowed<>(key, window);
+ }
+
+ Windowed<String> windowFor(int startSecond, int endSecond)
+ {
+ Instant startTime = Instant.ofEpochSecond(startSecond);
+ Instant endTime = Instant.ofEpochSecond(endSecond);
+ SessionWindow window = new SessionWindow(startTime.toEpochMilli(), endTime.toEpochMilli());
+ return new Windowed<>(KEY, window);
+ }
+}
+++ /dev/null
-package de.juplo.kafka.wordcount.counter;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.SessionWindows;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.SessionWindow;
-import org.apache.kafka.streams.test.TestRecord;
-import org.junit.jupiter.api.Test;
-
-import java.time.Duration;
-import java.time.Instant;
-import java.util.Properties;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Stream;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-
-@Slf4j
-public class AggregationTopologyTest
-{
- static final Duration INACTIVITY_GAP = Duration.ofSeconds(2);
- static final SessionWindows WINDOWS = SessionWindows.ofInactivityGapWithNoGrace(INACTIVITY_GAP);
-
-
- @Test
- public void test()
- {
- StreamsBuilder builder = new StreamsBuilder();
-
- KStream<String, String> input = builder.stream(INPUT);
-
- input
- .groupByKey()
- .windowedBy(WINDOWS)
- .reduce((aggregate, value) -> aggregate + "-" + value)
- .toStream((k,v) -> k.toString())
- .to(OUTPUT);
-
- Topology topology = builder.build();
- log.info("Generated topology: {}", topology.describe());
-
- Properties properties = new Properties();
- properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
- properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
-
- TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
-
- in = testDriver.createInputTopic(
- INPUT,
- Serdes.String().serializer(),
- Serdes.String().serializer());
- out = testDriver.createOutputTopic(
- OUTPUT,
- Serdes.String().deserializer(),
- Serdes.String().deserializer());
-
-
- sendAt("A", 63);
- assertThatOutcomeIs(
- KeyValue.pair(windowFor(63, 63), "A"));
-
- sendAt("B", 64);
- assertThatOutcomeIs(
- KeyValue.pair(windowFor(63, 63), null),
- KeyValue.pair(windowFor(63, 64), "A-B"));
-
- sendAt("C", 65);
- assertThatOutcomeIs(
- KeyValue.pair(windowFor(63, 64), null),
- KeyValue.pair(windowFor(63, 65), "A-B-C"));
-
- sendAt("D", 66);
- assertThatOutcomeIs(
- KeyValue.pair(windowFor(63, 65), null),
- KeyValue.pair(windowFor(63, 66), "A-B-C-D"));
-
- sendAt("E", 69);
- assertThatOutcomeIs(
- KeyValue.pair(windowFor(69, 69), "E"));
-
- sendAt("F", 70);
- assertThatOutcomeIs(
- KeyValue.pair(windowFor(69, 69), null),
- KeyValue.pair(windowFor(69, 70), "E-F"));
-
- sendAt("G", 74);
- assertThatOutcomeIs(
- KeyValue.pair(windowFor(74, 74), "G"));
-
- sendAt("H", 75);
- assertThatOutcomeIs(
- KeyValue.pair(windowFor(74, 74), null),
- KeyValue.pair(windowFor(74, 75), "G-H"));
-
- sendAt("I", 100);
- assertThatOutcomeIs(
- KeyValue.pair(windowFor(100, 100), "I"));
-
- sendAt("J", 120);
- assertThatOutcomeIs(
- KeyValue.pair(windowFor(120, 120), "J"));
-
- sendAt("K", 140);
- assertThatOutcomeIs(
- KeyValue.pair(windowFor(140, 140), "K"));
-
- sendAt("L", 160);
- assertThatOutcomeIs(
- KeyValue.pair(windowFor(160, 160), "L"));
- }
-
-
- static final String INPUT = "TEST-INPUT";
- static final String OUTPUT = "TEST-OUTPUT";
-
- static final String KEY = "foo";
-
-
- TestInputTopic<String, String> in;
- TestOutputTopic<String, String> out;
-
-
- void sendAt(String value, int second)
- {
- TestRecord<String, String> record = new TestRecord<>(KEY, value, Instant.ofEpochSecond(second));
- log.info(
- "Sending @ {}: {} = {}",
- record.getRecordTime().toEpochMilli(),
- record.key(),
- record.value());
- in.pipeInput(record);
- }
-
- void assertThatOutcomeIs(KeyValue<Windowed<String>, String>... expected)
- {
- assertThat(outcome()).containsExactly(expected);
- }
-
- Stream<KeyValue<Windowed<String>, String>> outcome()
- {
- return out
- .readRecordsToList()
- .stream()
- .peek(record -> log.info(
- "Received @ {}: {} = {}",
- record.getRecordTime().toEpochMilli(),
- record.key(),
- record.value()))
- .map(record -> KeyValue.pair(parse(record.key()), record.value()));
- }
-
-
- static final Pattern PATTERN = Pattern.compile("^\\[([^@]*)@(\\d+)/(\\d+)\\]$");
-
- Windowed<String> parse(String serialized)
- {
- Matcher matcher = PATTERN.matcher(serialized);
-
- if (!matcher.matches())
- {
- throw new IllegalArgumentException(serialized + "does not match " + PATTERN.pattern());
- }
-
- String key = matcher.group(1);
- String start = matcher.group(2);
- String end = matcher.group(3);
-
- Window window = new SessionWindow(Long.parseLong(start), Long.parseLong(end));
-
- return new Windowed<>(key, window);
- }
-
- Windowed<String> windowFor(int startSecond, int endSecond)
- {
- Instant startTime = Instant.ofEpochSecond(startSecond);
- Instant endTime = Instant.ofEpochSecond(endSecond);
- SessionWindow window = new SessionWindow(startTime.toEpochMilli(), endTime.toEpochMilli());
- return new Windowed<>(KEY, window);
- }
-}