3dfd235143393c24c885905f749f31af913dd9e0
[demos/kafka/wordcount] /
1 package de.juplo.kafka.wordcount.counter;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.common.serialization.Serdes;
5 import org.apache.kafka.common.utils.Bytes;
6 import org.apache.kafka.streams.*;
7 import org.apache.kafka.streams.kstream.*;
8 import org.apache.kafka.streams.kstream.internals.SessionWindow;
9 import org.apache.kafka.streams.state.SessionStore;
10 import org.apache.kafka.streams.test.TestRecord;
11 import org.junit.jupiter.api.Test;
12
13 import java.time.Duration;
14 import java.time.Instant;
15 import java.util.Properties;
16 import java.util.regex.Matcher;
17 import java.util.regex.Pattern;
18 import java.util.stream.Stream;
19
20 import static org.assertj.core.api.Assertions.assertThat;
21
22
23 @Slf4j
24 public class AggregationTopologyTest
25 {
26   static final String STORE_NAME = "aggregate-store";
27   static final Duration INACTIVITY_GAP = Duration.ofSeconds(2);
28   static final SessionWindows WINDOWS = SessionWindows.ofInactivityGapWithNoGrace(INACTIVITY_GAP);
29
30
31   @Test
32   public void test()
33   {
34     StreamsBuilder builder = new StreamsBuilder();
35
36     KStream<String, String> input = builder.stream(INPUT);
37
38     input
39         .groupByKey()
40         .windowedBy(WINDOWS)
41         .reduce(
42             (aggregate, value) -> aggregate + "-" + value,
43             Materialized.<String, String, SessionStore<Bytes, byte[]>>as(STORE_NAME)
44                 .withKeySerde(Serdes.String())
45                 .withValueSerde(Serdes.String()))
46         .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
47         .toStream((k,v) -> k.toString())
48         .to(OUTPUT);
49
50     Topology topology = builder.build();
51     log.info("Generated topology: {}", topology.describe());
52
53     Properties properties = new Properties();
54     properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
55     properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
56
57     TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
58
59     in = testDriver.createInputTopic(
60         INPUT,
61         Serdes.String().serializer(),
62         Serdes.String().serializer());
63     out = testDriver.createOutputTopic(
64         OUTPUT,
65         Serdes.String().deserializer(),
66         Serdes.String().deserializer());
67
68
69     sendAt("A", 63);
70     assertThatOutcomeIs();
71
72     sendAt("B", 64);
73     assertThatOutcomeIs();
74
75     sendAt("C", 65);
76     assertThatOutcomeIs();
77
78     sendAt("D", 66);
79     assertThatOutcomeIs();
80
81     sendAt("E", 69);
82     assertThatOutcomeIs(KeyValue.pair(windowFor(63, 66), "A-B-C-D"));
83
84     sendAt("F", 70);
85     assertThatOutcomeIs();
86
87     sendAt("G", 74);
88     assertThatOutcomeIs(KeyValue.pair(windowFor(69, 70), "E-F"));
89
90     sendAt("H", 75);
91     assertThatOutcomeIs();
92
93     sendAt("I", 100);
94     assertThatOutcomeIs(KeyValue.pair(windowFor(74, 75), "G-H"));
95
96     sendAt("J", 120);
97     assertThatOutcomeIs(KeyValue.pair(windowFor(100, 100), "I"));
98
99     sendAt("K", 140);
100     assertThatOutcomeIs(KeyValue.pair(windowFor(120, 120), "J"));
101
102     sendAt("L", 160);
103     assertThatOutcomeIs(KeyValue.pair(windowFor(140, 140), "K"));
104
105     // Never received, if no newer message is sent
106     // KeyValue.pair(windowFor(160, 160), "L"));
107   }
108
109
110   static final String INPUT = "TEST-INPUT";
111   static final String OUTPUT = "TEST-OUTPUT";
112
113   static final String KEY = "foo";
114
115
116   TestInputTopic<String, String> in;
117   TestOutputTopic<String, String> out;
118
119
120   void sendAt(String value, int second)
121   {
122     TestRecord<String, String> record = new TestRecord<>(KEY, value, Instant.ofEpochSecond(second));
123     log.info(
124         "Sending  @ {}: {} = {}",
125         record.getRecordTime().toEpochMilli(),
126         record.key(),
127         record.value());
128     in.pipeInput(record);
129   }
130
131   void assertThatOutcomeIs(KeyValue<Windowed<String>, String>... expected)
132   {
133     assertThat(outcome()).containsExactly(expected);
134   }
135
136   Stream<KeyValue<Windowed<String>, String>> outcome()
137   {
138     return out
139         .readRecordsToList()
140         .stream()
141         .peek(record -> log.info(
142             "Received @ {}: {} = {}",
143             record.getRecordTime().toEpochMilli(),
144             record.key(),
145             record.value()))
146         .map(record -> KeyValue.pair(parse(record.key()), record.value()));
147   }
148
149
150   static final Pattern PATTERN = Pattern.compile("^\\[([^@]*)@(\\d+)/(\\d+)\\]$");
151
152   Windowed<String> parse(String serialized)
153   {
154     Matcher matcher = PATTERN.matcher(serialized);
155
156     if (!matcher.matches())
157     {
158       throw new IllegalArgumentException(serialized + "does not match " + PATTERN.pattern());
159     }
160
161     String key = matcher.group(1);
162     String start = matcher.group(2);
163     String end = matcher.group(3);
164
165     Window window = new SessionWindow(Long.parseLong(start), Long.parseLong(end));
166
167     return new Windowed<>(key, window);
168   }
169
170   Windowed<String> windowFor(int startSecond, int endSecond)
171   {
172     Instant startTime = Instant.ofEpochSecond(startSecond);
173     Instant endTime = Instant.ofEpochSecond(endSecond);
174     SessionWindow window = new SessionWindow(startTime.toEpochMilli(), endTime.toEpochMilli());
175     return new Windowed<>(KEY, window);
176   }
177 }