65eeeaba2b4cac9479d59a85a82ab5181def0bb3
[demos/kafka/wordcount] /
1 package de.juplo.kafka.wordcount.counter;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.common.serialization.Serdes;
5 import org.apache.kafka.common.utils.Bytes;
6 import org.apache.kafka.streams.*;
7 import org.apache.kafka.streams.kstream.*;
8 import org.apache.kafka.streams.kstream.internals.TimeWindow;
9 import org.apache.kafka.streams.state.WindowStore;
10 import org.apache.kafka.streams.test.TestRecord;
11 import org.junit.jupiter.api.Test;
12
13 import java.time.Duration;
14 import java.time.Instant;
15 import java.util.Properties;
16 import java.util.regex.Matcher;
17 import java.util.regex.Pattern;
18 import java.util.stream.Stream;
19
20 import static org.assertj.core.api.Assertions.assertThat;
21
22
23 @Slf4j
24 public class AggregationTopologyTest
25 {
26   static final Duration WINDOW_SIZE = Duration.ofSeconds(10);
27   static final Duration ADVANCE = Duration.ofSeconds(3);
28   static final TimeWindows WINDOWS = TimeWindows
29       .ofSizeWithNoGrace(WINDOW_SIZE)
30       .advanceBy(ADVANCE);
31
32
33   @Test
34   public void test()
35   {
36     StreamsBuilder builder = new StreamsBuilder();
37
38     KStream<String, String> input = builder.stream(INPUT);
39
40     input
41         .groupByKey()
42         .windowedBy(WINDOWS)
43         .reduce(
44             (aggregate, value) -> aggregate + "-" + value,
45             Materialized.<String, String, WindowStore<Bytes, byte[]>>as("aggregated-store")
46                 .withKeySerde(Serdes.String())
47                 .withValueSerde(Serdes.String()))
48         .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
49         .toStream((k,v) -> k.toString())
50         .to(OUTPUT);
51
52     Topology topology = builder.build();
53     log.info("Generated topology: {}", topology.describe());
54
55     Properties properties = new Properties();
56     properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
57     properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
58
59     TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
60
61     in = testDriver.createInputTopic(
62         INPUT,
63         Serdes.String().serializer(),
64         Serdes.String().serializer());
65     out = testDriver.createOutputTopic(
66         OUTPUT,
67         Serdes.String().deserializer(),
68         Serdes.String().deserializer());
69
70
71     sendAt("A", 63);
72     assertThatOutcomeIs();
73
74     sendAt("B", 64);
75     assertThatOutcomeIs(KeyValue.pair(windowFor(54), "A"));
76
77     sendAt("C", 65);
78     assertThatOutcomeIs();
79
80     sendAt("D", 66);
81     assertThatOutcomeIs();
82
83     sendAt("E", 69);
84     assertThatOutcomeIs(KeyValue.pair(windowFor(57), "A-B-C-D"));
85
86     sendAt("F", 70);
87     assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C-D-E"));
88
89     sendAt("G", 74);
90     assertThatOutcomeIs(KeyValue.pair(windowFor(63), "A-B-C-D-E-F"));
91
92     sendAt("H", 75);
93     assertThatOutcomeIs();
94
95     sendAt("I", 100);
96     assertThatOutcomeIs(
97         KeyValue.pair(windowFor(66), "D-E-F-G-H"),
98         KeyValue.pair(windowFor(69), "E-F-G-H"),
99         KeyValue.pair(windowFor(72), "G-H"),
100         KeyValue.pair(windowFor(75), "H"));
101
102     sendAt("J", 120);
103     assertThatOutcomeIs(
104         KeyValue.pair(windowFor(93), "I"),
105         KeyValue.pair(windowFor(96), "I"),
106         KeyValue.pair(windowFor(99), "I"));
107
108     sendAt("K", 140);
109     assertThatOutcomeIs(
110         KeyValue.pair(windowFor(111), "J"),
111         KeyValue.pair(windowFor(114), "J"),
112         KeyValue.pair(windowFor(117), "J"),
113         KeyValue.pair(windowFor(120), "J"));
114
115     sendAt("L", 160);
116     assertThatOutcomeIs(
117         KeyValue.pair(windowFor(132), "K"),
118         KeyValue.pair(windowFor(135), "K"),
119         KeyValue.pair(windowFor(138), "K"));
120
121     // Never received, if no newer message is send
122     // KeyValue.pair(windowFor(153), "L")
123     // KeyValue.pair(windowFor(156), "L")
124     //KeyValue.pair(windowFor(159), "L")
125   }
126
127
128   static final String INPUT = "TEST-INPUT";
129   static final String OUTPUT = "TEST-OUTPUT";
130
131   static final String KEY = "foo";
132
133
134   TestInputTopic<String, String> in;
135   TestOutputTopic<String, String> out;
136
137
138   void sendAt(String value, int second)
139   {
140     TestRecord<String, String> record = new TestRecord<>(KEY, value, Instant.ofEpochSecond(second));
141     log.info(
142         "Sending  @ {}: {} = {}",
143         record.getRecordTime().toEpochMilli(),
144         record.key(),
145         record.value());
146     in.pipeInput(record);
147   }
148
149   void assertThatOutcomeIs(KeyValue<Windowed<String>, String>... expected)
150   {
151     assertThat(outcome()).containsExactly(expected);
152   }
153
154   Stream<KeyValue<Windowed<String>, String>> outcome()
155   {
156     return out
157         .readRecordsToList()
158         .stream()
159         .peek(record -> log.info(
160             "Received @ {}: {} = {}",
161             record.getRecordTime().toEpochMilli(),
162             record.key(),
163             record.value()))
164         .map(record -> KeyValue.pair(parse(record.key()), record.value()));
165   }
166
167
168   static final Pattern PATTERN = Pattern.compile("^\\[([^@]*)@(\\d+)/(\\d+)\\]$");
169
170   Windowed<String> parse(String serialized)
171   {
172     Matcher matcher = PATTERN.matcher(serialized);
173
174     if (!matcher.matches())
175     {
176       throw new IllegalArgumentException(serialized + "does not match " + PATTERN.pattern());
177     }
178
179     String key = matcher.group(1);
180     String start = matcher.group(2);
181     String end = matcher.group(3);
182
183     Window window = new TimeWindow(Long.parseLong(start), Long.parseLong(end));
184
185     return new Windowed<>(key, window);
186   }
187
188   Windowed<String> windowFor(int second)
189   {
190     Instant startTime = Instant.ofEpochSecond(second);
191     Instant endTime = startTime.plus(WINDOW_SIZE);
192     TimeWindow window = new TimeWindow(startTime.toEpochMilli(), endTime.toEpochMilli());
193     return new Windowed<>(KEY, window);
194   }
195 }