1 package de.juplo.kafka.wordcount.counter;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.common.serialization.Serdes;
5 import org.apache.kafka.common.utils.Bytes;
6 import org.apache.kafka.streams.*;
7 import org.apache.kafka.streams.kstream.*;
8 import org.apache.kafka.streams.kstream.internals.TimeWindow;
9 import org.apache.kafka.streams.state.WindowStore;
10 import org.apache.kafka.streams.test.TestRecord;
11 import org.junit.jupiter.api.Test;
13 import java.time.Duration;
14 import java.time.Instant;
15 import java.util.Properties;
16 import java.util.regex.Matcher;
17 import java.util.regex.Pattern;
18 import java.util.stream.Stream;
20 import static org.assertj.core.api.Assertions.assertThat;
24 public class AggregationTopologyTest
26 static final Duration WINDOW_SIZE = Duration.ofSeconds(10);
27 static final TimeWindows WINDOWS = TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE);
33 StreamsBuilder builder = new StreamsBuilder();
35 KStream<String, String> input = builder.stream(INPUT);
41 (aggregate, value) -> aggregate + "-" + value,
42 Materialized.<String, String, WindowStore<Bytes, byte[]>>as("aggregated-store")
43 .withKeySerde(Serdes.String())
44 .withValueSerde(Serdes.String()))
45 .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
46 .toStream((k,v) -> k.toString())
49 Topology topology = builder.build();
50 log.info("Generated topology: {}", topology.describe());
52 Properties properties = new Properties();
53 properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
54 properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
56 TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
58 in = testDriver.createInputTopic(
60 Serdes.String().serializer(),
61 Serdes.String().serializer());
62 out = testDriver.createOutputTopic(
64 Serdes.String().deserializer(),
65 Serdes.String().deserializer());
69 assertThatOutcomeIs();
72 assertThatOutcomeIs();
75 assertThatOutcomeIs();
78 assertThatOutcomeIs();
81 assertThatOutcomeIs();
84 assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C-D-E"));
87 assertThatOutcomeIs();
90 assertThatOutcomeIs();
93 assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F-G-H"));
96 assertThatOutcomeIs(KeyValue.pair(windowFor(100), "I"));
99 assertThatOutcomeIs(KeyValue.pair(windowFor(120), "J"));
102 assertThatOutcomeIs(KeyValue.pair(windowFor(140), "K"));
104 // Never received, if no newer message is send
105 // KeyValue.pair(windowFor(160), "L")
109 static final String INPUT = "TEST-INPUT";
110 static final String OUTPUT = "TEST-OUTPUT";
112 static final String KEY = "foo";
115 TestInputTopic<String, String> in;
116 TestOutputTopic<String, String> out;
119 void sendAt(String value, int second)
121 TestRecord<String, String> record = new TestRecord<>(KEY, value, Instant.ofEpochSecond(second));
123 "Sending @ {}: {} = {}",
124 record.getRecordTime().toEpochMilli(),
127 in.pipeInput(record);
130 void assertThatOutcomeIs(KeyValue<Windowed<String>, String>... expected)
132 assertThat(outcome()).containsExactly(expected);
135 Stream<KeyValue<Windowed<String>, String>> outcome()
140 .peek(record -> log.info(
141 "Received @ {}: {} = {}",
142 record.getRecordTime().toEpochMilli(),
145 .map(record -> KeyValue.pair(parse(record.key()), record.value()));
149 static final Pattern PATTERN = Pattern.compile("^\\[([^@]*)@(\\d+)/(\\d+)\\]$");
151 Windowed<String> parse(String serialized)
153 Matcher matcher = PATTERN.matcher(serialized);
155 if (!matcher.matches())
157 throw new IllegalArgumentException(serialized + "does not match " + PATTERN.pattern());
160 String key = matcher.group(1);
161 String start = matcher.group(2);
162 String end = matcher.group(3);
164 Window window = new TimeWindow(Long.parseLong(start), Long.parseLong(end));
166 return new Windowed<>(key, window);
169 Windowed<String> windowFor(int second)
171 Instant startTime = Instant.ofEpochSecond(second);
172 Instant endTime = startTime.plus(WINDOW_SIZE);
173 TimeWindow window = new TimeWindow(startTime.toEpochMilli(), endTime.toEpochMilli());
174 return new Windowed<>(KEY, window);