5c2ff6b39d1eba71a7c64e93cd9bf495c53aaea6
[demos/kafka/wordcount] /
1 package de.juplo.kafka.wordcount.counter;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.common.serialization.Serdes;
5 import org.apache.kafka.common.utils.Bytes;
6 import org.apache.kafka.streams.*;
7 import org.apache.kafka.streams.kstream.*;
8 import org.apache.kafka.streams.kstream.internals.TimeWindow;
9 import org.apache.kafka.streams.state.KeyValueIterator;
10 import org.apache.kafka.streams.state.WindowStore;
11 import org.apache.kafka.streams.test.TestRecord;
12 import org.junit.jupiter.api.Test;
13
14 import java.time.Duration;
15 import java.time.Instant;
16 import java.util.Properties;
17 import java.util.regex.Matcher;
18 import java.util.regex.Pattern;
19 import java.util.stream.Stream;
20
21 import static org.assertj.core.api.Assertions.assertThat;
22
23
24 @Slf4j
25 public class AggregationTopologyTest
26 {
27   static final String STORE_NAME = "aggregate-store";
28   static final Duration WINDOW_SIZE = Duration.ofSeconds(10);
29   static final SlidingWindows WINDOWS = SlidingWindows.ofTimeDifferenceWithNoGrace(WINDOW_SIZE);
30
31
32   @Test
33   public void test()
34   {
35     StreamsBuilder builder = new StreamsBuilder();
36
37     KStream<String, String> input = builder.stream(INPUT);
38
39     input
40         .groupByKey()
41         .windowedBy(WINDOWS)
42         .reduce(
43             (aggregate, value) -> aggregate + "-" + value,
44             Materialized.<String, String, WindowStore<Bytes, byte[]>>as(STORE_NAME)
45                 .withKeySerde(Serdes.String())
46                 .withValueSerde(Serdes.String()))
47         .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
48         .toStream((k,v) -> k.toString())
49         .to(OUTPUT);
50
51     Topology topology = builder.build();
52     log.info("Generated topology: {}", topology.describe());
53
54     Properties properties = new Properties();
55     properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
56     properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
57
58     TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
59
60     in = testDriver.createInputTopic(
61         INPUT,
62         Serdes.String().serializer(),
63         Serdes.String().serializer());
64     out = testDriver.createOutputTopic(
65         OUTPUT,
66         Serdes.String().deserializer(),
67         Serdes.String().deserializer());
68
69
70     sendAt("A", 63);
71     assertThatOutcomeIs(
72         KeyValue.pair(windowFor(53000), "A"));
73
74     logStateStore(testDriver);
75
76     sendAt("B", 64);
77     assertThatOutcomeIs(
78         KeyValue.pair(windowFor(54000), "A-B"));
79
80     logStateStore(testDriver);
81
82     sendAt("C", 65);
83     assertThatOutcomeIs(
84         KeyValue.pair(windowFor(55000), "A-B-C"));
85
86     logStateStore(testDriver);
87
88     sendAt("D", 66);
89     assertThatOutcomeIs(
90         KeyValue.pair(windowFor(56000), "A-B-C-D"));
91
92     logStateStore(testDriver);
93
94     sendAt("E", 69);
95     assertThatOutcomeIs(
96         KeyValue.pair(windowFor(59000), "A-B-C-D-E"));
97
98     logStateStore(testDriver);
99
100     sendAt("F", 70);
101     assertThatOutcomeIs(
102         KeyValue.pair(windowFor(60000), "A-B-C-D-E-F"));
103
104     logStateStore(testDriver);
105
106     sendAt("G", 74);
107     assertThatOutcomeIs(
108         KeyValue.pair(windowFor(63001), "B-C-D-E-F"),
109         KeyValue.pair(windowFor(64000), "B-C-D-E-F-G"));
110
111     logStateStore(testDriver);
112
113     sendAt("H", 75);
114     assertThatOutcomeIs(
115         KeyValue.pair(windowFor(64001), "C-D-E-F-G"),
116         KeyValue.pair(windowFor(65000), "C-D-E-F-G-H"));
117
118     logStateStore(testDriver);
119
120     sendAt("I", 100);
121     assertThatOutcomeIs(
122         KeyValue.pair(windowFor(65001), "D-E-F-G-H"),
123         KeyValue.pair(windowFor(66001), "E-F-G-H"),
124         KeyValue.pair(windowFor(69001), "F-G-H"),
125         KeyValue.pair(windowFor(70001), "G-H"),
126         KeyValue.pair(windowFor(74001), "H"),
127         KeyValue.pair(windowFor(90000), "I"));
128
129     logStateStore(testDriver);
130
131     sendAt("J", 120);
132     assertThatOutcomeIs(
133         KeyValue.pair(windowFor(110000), "J"));
134
135     logStateStore(testDriver);
136
137     sendAt("K", 140);
138     assertThatOutcomeIs(
139         KeyValue.pair(windowFor(130000), "K"));
140
141     logStateStore(testDriver);
142
143     sendAt("L", 160);
144     assertThatOutcomeIs(
145         KeyValue.pair(windowFor(150000), "L"));
146
147     logStateStore(testDriver);
148   }
149
150
151   static final String INPUT = "TEST-INPUT";
152   static final String OUTPUT = "TEST-OUTPUT";
153
154   static final String KEY = "foo";
155
156
157   TestInputTopic<String, String> in;
158   TestOutputTopic<String, String> out;
159
160
161   void sendAt(String value, int second)
162   {
163     TestRecord<String, String> record = new TestRecord<>(KEY, value, Instant.ofEpochSecond(second));
164     log.info(
165         "Sending  @ {}: {} = {}",
166         record.getRecordTime().toEpochMilli(),
167         record.key(),
168         record.value());
169     in.pipeInput(record);
170   }
171
172   void assertThatOutcomeIs(KeyValue<Windowed<String>, String>... expected)
173   {
174     assertThat(outcome()).containsExactly(expected);
175   }
176
177   Stream<KeyValue<Windowed<String>, String>> outcome()
178   {
179     return out
180         .readRecordsToList()
181         .stream()
182         .peek(record -> log.info(
183             "Received @ {}: {} = {}",
184             record.getRecordTime().toEpochMilli(),
185             record.key(),
186             record.value()))
187         .map(record -> KeyValue.pair(parse(record.key()), record.value()));
188   }
189
190   void logStateStore(TopologyTestDriver testDriver)
191   {
192     KeyValueIterator i = testDriver.getTimestampedWindowStore(STORE_NAME).all();
193     while(i.hasNext())
194     {
195       Object o = i.next();
196       log.info("{}", o);
197     }
198   }
199
200   static final Pattern PATTERN = Pattern.compile("^\\[([^@]*)@(\\d+)/(\\d+)\\]$");
201
202   Windowed<String> parse(String serialized)
203   {
204     Matcher matcher = PATTERN.matcher(serialized);
205
206     if (!matcher.matches())
207     {
208       throw new IllegalArgumentException(serialized + "does not match " + PATTERN.pattern());
209     }
210
211     String key = matcher.group(1);
212     String start = matcher.group(2);
213     String end = matcher.group(3);
214
215     Window window = new TimeWindow(Long.parseLong(start), Long.parseLong(end));
216
217     return new Windowed<>(key, window);
218   }
219
220   Windowed<String> windowFor(long milli)
221   {
222     Instant startTime = Instant.ofEpochMilli(milli);
223     Instant endTime = startTime.plus(WINDOW_SIZE);
224     TimeWindow window = new TimeWindow(startTime.toEpochMilli(), endTime.toEpochMilli());
225     return new Windowed<>(KEY, window);
226   }
227 }