1 package de.juplo.kafka.wordcount.counter;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.common.serialization.Serdes;
5 import org.apache.kafka.common.utils.Bytes;
6 import org.apache.kafka.streams.*;
7 import org.apache.kafka.streams.kstream.*;
8 import org.apache.kafka.streams.kstream.internals.TimeWindow;
9 import org.apache.kafka.streams.state.KeyValueIterator;
10 import org.apache.kafka.streams.state.WindowStore;
11 import org.apache.kafka.streams.test.TestRecord;
12 import org.junit.jupiter.api.Test;
14 import java.time.Duration;
15 import java.time.Instant;
16 import java.util.Properties;
17 import java.util.regex.Matcher;
18 import java.util.regex.Pattern;
19 import java.util.stream.Stream;
21 import static org.assertj.core.api.Assertions.assertThat;
25 public class AggregationTopologyTest
27 static final String STORE_NAME = "aggregate-store";
28 static final Duration WINDOW_SIZE = Duration.ofSeconds(10);
29 static final SlidingWindows WINDOWS = SlidingWindows.ofTimeDifferenceWithNoGrace(WINDOW_SIZE);
35 StreamsBuilder builder = new StreamsBuilder();
37 KStream<String, String> input = builder.stream(INPUT);
43 (aggregate, value) -> aggregate + "-" + value,
44 Materialized.<String, String, WindowStore<Bytes, byte[]>>as(STORE_NAME)
45 .withKeySerde(Serdes.String())
46 .withValueSerde(Serdes.String()))
47 .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
48 .toStream((k,v) -> k.toString())
51 Topology topology = builder.build();
52 log.info("Generated topology: {}", topology.describe());
54 Properties properties = new Properties();
55 properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
56 properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
58 TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
60 in = testDriver.createInputTopic(
62 Serdes.String().serializer(),
63 Serdes.String().serializer());
64 out = testDriver.createOutputTopic(
66 Serdes.String().deserializer(),
67 Serdes.String().deserializer());
72 KeyValue.pair(windowFor(53000), "A"));
74 logStateStore(testDriver);
78 KeyValue.pair(windowFor(54000), "A-B"));
80 logStateStore(testDriver);
84 KeyValue.pair(windowFor(55000), "A-B-C"));
86 logStateStore(testDriver);
90 KeyValue.pair(windowFor(56000), "A-B-C-D"));
92 logStateStore(testDriver);
96 KeyValue.pair(windowFor(59000), "A-B-C-D-E"));
98 logStateStore(testDriver);
102 KeyValue.pair(windowFor(60000), "A-B-C-D-E-F"));
104 logStateStore(testDriver);
108 KeyValue.pair(windowFor(63001), "B-C-D-E-F"),
109 KeyValue.pair(windowFor(64000), "B-C-D-E-F-G"));
111 logStateStore(testDriver);
115 KeyValue.pair(windowFor(64001), "C-D-E-F-G"),
116 KeyValue.pair(windowFor(65000), "C-D-E-F-G-H"));
118 logStateStore(testDriver);
122 KeyValue.pair(windowFor(65001), "D-E-F-G-H"),
123 KeyValue.pair(windowFor(66001), "E-F-G-H"),
124 KeyValue.pair(windowFor(69001), "F-G-H"),
125 KeyValue.pair(windowFor(70001), "G-H"),
126 KeyValue.pair(windowFor(74001), "H"),
127 KeyValue.pair(windowFor(90000), "I"));
129 logStateStore(testDriver);
133 KeyValue.pair(windowFor(110000), "J"));
135 logStateStore(testDriver);
139 KeyValue.pair(windowFor(130000), "K"));
141 logStateStore(testDriver);
145 KeyValue.pair(windowFor(150000), "L"));
147 logStateStore(testDriver);
151 static final String INPUT = "TEST-INPUT";
152 static final String OUTPUT = "TEST-OUTPUT";
154 static final String KEY = "foo";
157 TestInputTopic<String, String> in;
158 TestOutputTopic<String, String> out;
161 void sendAt(String value, int second)
163 TestRecord<String, String> record = new TestRecord<>(KEY, value, Instant.ofEpochSecond(second));
165 "Sending @ {}: {} = {}",
166 record.getRecordTime().toEpochMilli(),
169 in.pipeInput(record);
172 void assertThatOutcomeIs(KeyValue<Windowed<String>, String>... expected)
174 assertThat(outcome()).containsExactly(expected);
177 Stream<KeyValue<Windowed<String>, String>> outcome()
182 .peek(record -> log.info(
183 "Received @ {}: {} = {}",
184 record.getRecordTime().toEpochMilli(),
187 .map(record -> KeyValue.pair(parse(record.key()), record.value()));
190 void logStateStore(TopologyTestDriver testDriver)
192 KeyValueIterator i = testDriver.getTimestampedWindowStore(STORE_NAME).all();
200 static final Pattern PATTERN = Pattern.compile("^\\[([^@]*)@(\\d+)/(\\d+)\\]$");
202 Windowed<String> parse(String serialized)
204 Matcher matcher = PATTERN.matcher(serialized);
206 if (!matcher.matches())
208 throw new IllegalArgumentException(serialized + "does not match " + PATTERN.pattern());
211 String key = matcher.group(1);
212 String start = matcher.group(2);
213 String end = matcher.group(3);
215 Window window = new TimeWindow(Long.parseLong(start), Long.parseLong(end));
217 return new Windowed<>(key, window);
220 Windowed<String> windowFor(long milli)
222 Instant startTime = Instant.ofEpochMilli(milli);
223 Instant endTime = startTime.plus(WINDOW_SIZE);
224 TimeWindow window = new TimeWindow(startTime.toEpochMilli(), endTime.toEpochMilli());
225 return new Windowed<>(KEY, window);