1 package de.juplo.kafka.wordcount.counter;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.common.serialization.Serdes;
5 import org.apache.kafka.common.utils.Bytes;
6 import org.apache.kafka.streams.*;
7 import org.apache.kafka.streams.kstream.*;
8 import org.apache.kafka.streams.kstream.internals.SessionWindow;
9 import org.apache.kafka.streams.state.SessionStore;
10 import org.apache.kafka.streams.test.TestRecord;
11 import org.junit.jupiter.api.Test;
13 import java.time.Duration;
14 import java.time.Instant;
15 import java.util.Properties;
16 import java.util.regex.Matcher;
17 import java.util.regex.Pattern;
18 import java.util.stream.Stream;
20 import static org.assertj.core.api.Assertions.assertThat;
24 public class AggregationTopologyTest
26 static final String STORE_NAME = "aggregate-store";
27 static final Duration INACTIVITY_GAP = Duration.ofSeconds(2);
28 static final SessionWindows WINDOWS = SessionWindows.ofInactivityGapWithNoGrace(INACTIVITY_GAP);
34 StreamsBuilder builder = new StreamsBuilder();
36 KStream<String, String> input = builder.stream(INPUT);
42 (aggregate, value) -> aggregate + "-" + value,
43 Materialized.<String, String, SessionStore<Bytes, byte[]>>as(STORE_NAME)
44 .withKeySerde(Serdes.String())
45 .withValueSerde(Serdes.String()))
46 .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
47 .toStream((k,v) -> k.toString())
50 Topology topology = builder.build();
51 log.info("Generated topology: {}", topology.describe());
53 Properties properties = new Properties();
54 properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
55 properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
57 TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
59 in = testDriver.createInputTopic(
61 Serdes.String().serializer(),
62 Serdes.String().serializer());
63 out = testDriver.createOutputTopic(
65 Serdes.String().deserializer(),
66 Serdes.String().deserializer());
70 assertThatOutcomeIs();
73 assertThatOutcomeIs();
76 assertThatOutcomeIs();
79 assertThatOutcomeIs();
82 assertThatOutcomeIs(KeyValue.pair(windowFor(63, 66), "A-B-C-D"));
85 assertThatOutcomeIs();
88 assertThatOutcomeIs(KeyValue.pair(windowFor(69, 70), "E-F"));
91 assertThatOutcomeIs();
94 assertThatOutcomeIs(KeyValue.pair(windowFor(74, 75), "G-H"));
97 assertThatOutcomeIs(KeyValue.pair(windowFor(100, 100), "I"));
100 assertThatOutcomeIs(KeyValue.pair(windowFor(120, 120), "J"));
103 assertThatOutcomeIs(KeyValue.pair(windowFor(140, 140), "K"));
105 // Never received, if no newer message is sent
106 // KeyValue.pair(windowFor(160, 160), "L"));
110 static final String INPUT = "TEST-INPUT";
111 static final String OUTPUT = "TEST-OUTPUT";
113 static final String KEY = "foo";
116 TestInputTopic<String, String> in;
117 TestOutputTopic<String, String> out;
120 void sendAt(String value, int second)
122 TestRecord<String, String> record = new TestRecord<>(KEY, value, Instant.ofEpochSecond(second));
124 "Sending @ {}: {} = {}",
125 record.getRecordTime().toEpochMilli(),
128 in.pipeInput(record);
131 void assertThatOutcomeIs(KeyValue<Windowed<String>, String>... expected)
133 assertThat(outcome()).containsExactly(expected);
136 Stream<KeyValue<Windowed<String>, String>> outcome()
141 .peek(record -> log.info(
142 "Received @ {}: {} = {}",
143 record.getRecordTime().toEpochMilli(),
146 .map(record -> KeyValue.pair(parse(record.key()), record.value()));
150 static final Pattern PATTERN = Pattern.compile("^\\[([^@]*)@(\\d+)/(\\d+)\\]$");
152 Windowed<String> parse(String serialized)
154 Matcher matcher = PATTERN.matcher(serialized);
156 if (!matcher.matches())
158 throw new IllegalArgumentException(serialized + "does not match " + PATTERN.pattern());
161 String key = matcher.group(1);
162 String start = matcher.group(2);
163 String end = matcher.group(3);
165 Window window = new SessionWindow(Long.parseLong(start), Long.parseLong(end));
167 return new Windowed<>(key, window);
170 Windowed<String> windowFor(int startSecond, int endSecond)
172 Instant startTime = Instant.ofEpochSecond(startSecond);
173 Instant endTime = Instant.ofEpochSecond(endSecond);
174 SessionWindow window = new SessionWindow(startTime.toEpochMilli(), endTime.toEpochMilli());
175 return new Windowed<>(KEY, window);