1 package de.juplo.kafka.wordcount.counter;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.common.serialization.Serdes;
5 import org.apache.kafka.common.utils.Bytes;
6 import org.apache.kafka.streams.*;
7 import org.apache.kafka.streams.kstream.*;
8 import org.apache.kafka.streams.kstream.internals.TimeWindow;
9 import org.apache.kafka.streams.state.WindowStore;
10 import org.apache.kafka.streams.test.TestRecord;
11 import org.junit.jupiter.api.Test;
13 import java.time.Duration;
14 import java.time.Instant;
15 import java.util.Properties;
16 import java.util.regex.Matcher;
17 import java.util.regex.Pattern;
18 import java.util.stream.Stream;
20 import static org.assertj.core.api.Assertions.assertThat;
24 public class AggregationTopologyTest
26 static final Duration WINDOW_SIZE = Duration.ofSeconds(10);
27 static final Duration ADVANCE = Duration.ofSeconds(3);
28 static final TimeWindows WINDOWS = TimeWindows
29 .ofSizeWithNoGrace(WINDOW_SIZE)
36 StreamsBuilder builder = new StreamsBuilder();
38 KStream<String, String> input = builder.stream(INPUT);
44 (aggregate, value) -> aggregate + "-" + value,
45 Materialized.<String, String, WindowStore<Bytes, byte[]>>as("aggregated-store")
46 .withKeySerde(Serdes.String())
47 .withValueSerde(Serdes.String()))
48 .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
49 .toStream((k,v) -> k.toString())
52 Topology topology = builder.build();
53 log.info("Generated topology: {}", topology.describe());
55 Properties properties = new Properties();
56 properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
57 properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
59 TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
61 in = testDriver.createInputTopic(
63 Serdes.String().serializer(),
64 Serdes.String().serializer());
65 out = testDriver.createOutputTopic(
67 Serdes.String().deserializer(),
68 Serdes.String().deserializer());
72 assertThatOutcomeIs();
75 assertThatOutcomeIs(KeyValue.pair(windowFor(54), "A"));
78 assertThatOutcomeIs();
81 assertThatOutcomeIs();
84 assertThatOutcomeIs(KeyValue.pair(windowFor(57), "A-B-C-D"));
87 assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C-D-E"));
90 assertThatOutcomeIs(KeyValue.pair(windowFor(63), "A-B-C-D-E-F"));
93 assertThatOutcomeIs();
97 KeyValue.pair(windowFor(66), "D-E-F-G-H"),
98 KeyValue.pair(windowFor(69), "E-F-G-H"),
99 KeyValue.pair(windowFor(72), "G-H"),
100 KeyValue.pair(windowFor(75), "H"));
104 KeyValue.pair(windowFor(93), "I"),
105 KeyValue.pair(windowFor(96), "I"),
106 KeyValue.pair(windowFor(99), "I"));
110 KeyValue.pair(windowFor(111), "J"),
111 KeyValue.pair(windowFor(114), "J"),
112 KeyValue.pair(windowFor(117), "J"),
113 KeyValue.pair(windowFor(120), "J"));
117 KeyValue.pair(windowFor(132), "K"),
118 KeyValue.pair(windowFor(135), "K"),
119 KeyValue.pair(windowFor(138), "K"));
121 // Never received, if no newer message is send
122 // KeyValue.pair(windowFor(153), "L")
123 // KeyValue.pair(windowFor(156), "L")
124 //KeyValue.pair(windowFor(159), "L")
128 static final String INPUT = "TEST-INPUT";
129 static final String OUTPUT = "TEST-OUTPUT";
131 static final String KEY = "foo";
134 TestInputTopic<String, String> in;
135 TestOutputTopic<String, String> out;
138 void sendAt(String value, int second)
140 TestRecord<String, String> record = new TestRecord<>(KEY, value, Instant.ofEpochSecond(second));
142 "Sending @ {}: {} = {}",
143 record.getRecordTime().toEpochMilli(),
146 in.pipeInput(record);
149 void assertThatOutcomeIs(KeyValue<Windowed<String>, String>... expected)
151 assertThat(outcome()).containsExactly(expected);
154 Stream<KeyValue<Windowed<String>, String>> outcome()
159 .peek(record -> log.info(
160 "Received @ {}: {} = {}",
161 record.getRecordTime().toEpochMilli(),
164 .map(record -> KeyValue.pair(parse(record.key()), record.value()));
168 static final Pattern PATTERN = Pattern.compile("^\\[([^@]*)@(\\d+)/(\\d+)\\]$");
170 Windowed<String> parse(String serialized)
172 Matcher matcher = PATTERN.matcher(serialized);
174 if (!matcher.matches())
176 throw new IllegalArgumentException(serialized + "does not match " + PATTERN.pattern());
179 String key = matcher.group(1);
180 String start = matcher.group(2);
181 String end = matcher.group(3);
183 Window window = new TimeWindow(Long.parseLong(start), Long.parseLong(end));
185 return new Windowed<>(key, window);
188 Windowed<String> windowFor(int second)
190 Instant startTime = Instant.ofEpochSecond(second);
191 Instant endTime = startTime.plus(WINDOW_SIZE);
192 TimeWindow window = new TimeWindow(startTime.toEpochMilli(), endTime.toEpochMilli());
193 return new Windowed<>(KEY, window);