a650311f856616422377b62d94052d7e35f858f0
[demos/kafka/wordcount] /
1 package de.juplo.kafka.wordcount.counter;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.common.serialization.Serdes;
5 import org.apache.kafka.common.utils.Bytes;
6 import org.apache.kafka.streams.*;
7 import org.apache.kafka.streams.kstream.*;
8 import org.apache.kafka.streams.kstream.internals.TimeWindow;
9 import org.apache.kafka.streams.state.WindowStore;
10 import org.apache.kafka.streams.test.TestRecord;
11 import org.junit.jupiter.api.Test;
12
13 import java.time.Duration;
14 import java.time.Instant;
15 import java.util.Properties;
16 import java.util.regex.Matcher;
17 import java.util.regex.Pattern;
18 import java.util.stream.Stream;
19
20 import static org.assertj.core.api.Assertions.assertThat;
21
22
23 @Slf4j
24 public class AggregationTopologyTest
25 {
26   static final Duration WINDOW_SIZE = Duration.ofSeconds(10);
27   static final TimeWindows WINDOWS = TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE);
28
29
30   @Test
31   public void test()
32   {
33     StreamsBuilder builder = new StreamsBuilder();
34
35     KStream<String, String> input = builder.stream(INPUT);
36
37     input
38         .groupByKey()
39         .windowedBy(WINDOWS)
40         .reduce(
41             (aggregate, value) -> aggregate + "-" + value,
42             Materialized.<String, String, WindowStore<Bytes, byte[]>>as("aggregated-store")
43                 .withKeySerde(Serdes.String())
44                 .withValueSerde(Serdes.String()))
45         .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
46         .toStream((k,v) -> k.toString())
47         .to(OUTPUT);
48
49     Topology topology = builder.build();
50     log.info("Generated topology: {}", topology.describe());
51
52     Properties properties = new Properties();
53     properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
54     properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
55
56     TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
57
58     in = testDriver.createInputTopic(
59         INPUT,
60         Serdes.String().serializer(),
61         Serdes.String().serializer());
62     out = testDriver.createOutputTopic(
63         OUTPUT,
64         Serdes.String().deserializer(),
65         Serdes.String().deserializer());
66
67
68     sendAt("A", 63);
69     assertThatOutcomeIs();
70
71     sendAt("B", 64);
72     assertThatOutcomeIs();
73
74     sendAt("C", 65);
75     assertThatOutcomeIs();
76
77     sendAt("D", 66);
78     assertThatOutcomeIs();
79
80     sendAt("E", 69);
81     assertThatOutcomeIs();
82
83     sendAt("F", 70);
84     assertThatOutcomeIs(KeyValue.pair(windowFor(60), "A-B-C-D-E"));
85
86     sendAt("G", 74);
87     assertThatOutcomeIs();
88
89     sendAt("H", 75);
90     assertThatOutcomeIs();
91
92     sendAt("I", 100);
93     assertThatOutcomeIs(KeyValue.pair(windowFor(70), "F-G-H"));
94
95     sendAt("J", 120);
96     assertThatOutcomeIs(KeyValue.pair(windowFor(100), "I"));
97
98     sendAt("K", 140);
99     assertThatOutcomeIs(KeyValue.pair(windowFor(120), "J"));
100
101     sendAt("L", 160);
102     assertThatOutcomeIs(KeyValue.pair(windowFor(140), "K"));
103
104     // Never received, if no newer message is send
105     // KeyValue.pair(windowFor(160), "L")
106   }
107
108
109   static final String INPUT = "TEST-INPUT";
110   static final String OUTPUT = "TEST-OUTPUT";
111
112   static final String KEY = "foo";
113
114
115   TestInputTopic<String, String> in;
116   TestOutputTopic<String, String> out;
117
118
119   void sendAt(String value, int second)
120   {
121     TestRecord<String, String> record = new TestRecord<>(KEY, value, Instant.ofEpochSecond(second));
122     log.info(
123         "Sending  @ {}: {} = {}",
124         record.getRecordTime().toEpochMilli(),
125         record.key(),
126         record.value());
127     in.pipeInput(record);
128   }
129
130   void assertThatOutcomeIs(KeyValue<Windowed<String>, String>... expected)
131   {
132     assertThat(outcome()).containsExactly(expected);
133   }
134
135   Stream<KeyValue<Windowed<String>, String>> outcome()
136   {
137     return out
138         .readRecordsToList()
139         .stream()
140         .peek(record -> log.info(
141             "Received @ {}: {} = {}",
142             record.getRecordTime().toEpochMilli(),
143             record.key(),
144             record.value()))
145         .map(record -> KeyValue.pair(parse(record.key()), record.value()));
146   }
147
148
149   static final Pattern PATTERN = Pattern.compile("^\\[([^@]*)@(\\d+)/(\\d+)\\]$");
150
151   Windowed<String> parse(String serialized)
152   {
153     Matcher matcher = PATTERN.matcher(serialized);
154
155     if (!matcher.matches())
156     {
157       throw new IllegalArgumentException(serialized + "does not match " + PATTERN.pattern());
158     }
159
160     String key = matcher.group(1);
161     String start = matcher.group(2);
162     String end = matcher.group(3);
163
164     Window window = new TimeWindow(Long.parseLong(start), Long.parseLong(end));
165
166     return new Windowed<>(key, window);
167   }
168
169   Windowed<String> windowFor(int second)
170   {
171     Instant startTime = Instant.ofEpochSecond(second);
172     Instant endTime = startTime.plus(WINDOW_SIZE);
173     TimeWindow window = new TimeWindow(startTime.toEpochMilli(), endTime.toEpochMilli());
174     return new Windowed<>(KEY, window);
175   }
176 }