d1bbc0fb794280600901bc60ba70b6e7571c9788
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / splitter / SplitterApplicationIT.java
1 package de.juplo.kafka.wordcount.splitter;
2
3 import de.juplo.kafka.wordcount.counter.TestWord;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.streams.KeyValue;
6 import org.junit.jupiter.api.BeforeEach;
7 import org.junit.jupiter.api.Test;
8 import org.springframework.beans.factory.annotation.Autowired;
9 import org.springframework.boot.test.context.SpringBootTest;
10 import org.springframework.boot.test.context.TestConfiguration;
11 import org.springframework.context.annotation.Bean;
12 import org.springframework.kafka.annotation.KafkaListener;
13 import org.springframework.kafka.core.KafkaTemplate;
14 import org.springframework.kafka.support.KafkaHeaders;
15 import org.springframework.kafka.test.context.EmbeddedKafka;
16 import org.springframework.messaging.handler.annotation.Header;
17 import org.springframework.messaging.handler.annotation.Payload;
18 import org.springframework.util.LinkedMultiValueMap;
19 import org.springframework.util.MultiValueMap;
20
21 import java.time.Duration;
22 import java.util.function.BiConsumer;
23 import java.util.stream.Stream;
24
25 import static de.juplo.kafka.wordcount.splitter.SplitterApplicationIT.*;
26 import static org.assertj.core.api.Assertions.assertThat;
27 import static org.awaitility.Awaitility.await;
28
29
30 @SpringBootTest(
31                 properties = {
32                                 "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
33                                 "spring.kafka.producer.properties.spring.json.add.type.headers=false",
34                                 "spring.kafka.consumer.auto-offset-reset=earliest",
35                                 "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
36                                 "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.counter.TestWord",
37                                 "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.splitter",
38                                 "logging.level.root=WARN",
39                                 "logging.level.de.juplo=DEBUG",
40                                 "juplo.wordcount.splitter.bootstrap-server=${spring.embedded.kafka.brokers}",
41                                 "juplo.wordcount.splitter.input-topic=" + TOPIC_IN,
42                                 "juplo.wordcount.splitter.output-topic=" + TOPIC_OUT })
43 @EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT })
44 @Slf4j
45 public class SplitterApplicationIT
46 {
47         public final static String TOPIC_IN = "in";
48         public final static String TOPIC_OUT = "out";
49
50         @Autowired
51         KafkaTemplate<String, Recording> kafkaTemplate;
52         @Autowired
53         Consumer consumer;
54
55
56         @BeforeEach
57         public void clear()
58         {
59                 consumer.received.clear();
60         }
61
62         static void writeInputData(BiConsumer<String, Recording> consumer)
63         {
64                 Recording recording;
65
66                 recording = new Recording();
67                 recording.setUser("peter");
68                 recording.setSentence("Hallo Welt!");
69                 consumer.accept(recording.getUser(), recording);
70
71                 recording = new Recording();
72                 recording.setUser("klaus");
73                 recording.setSentence("Müsch gäb's auch!");
74                 consumer.accept(recording.getUser(), recording);
75
76                 recording = new Recording();
77                 recording.setUser("peter");
78                 recording.setSentence("Boäh, echt! ß mal nä Nümmäh!");
79                 consumer.accept(recording.getUser(), recording);
80         }
81
82         @Test
83         void testSendMessage() throws Exception
84         {
85                 writeInputData((user, recording) -> kafkaTemplate.send(TOPIC_IN, user, recording));
86
87                 await("Expexted converted data")
88                                 .atMost(Duration.ofSeconds(5))
89                                 .untilAsserted(() ->
90                                                 assertExpectedMessages(consumer.getReceivedMessages()));
91         }
92
93
94         static void assertExpectedMessages(MultiValueMap<String, TestWord> receivedMessages)
95         {
96                 MultiValueMap<String, TestWord> expected = new LinkedMultiValueMap<>();
97                 expectedMessages.forEach(keyValue -> expected.add(keyValue.key, keyValue.value));
98                 await("Received expected messages")
99                                 .atMost(Duration.ofSeconds(5))
100                                 .untilAsserted(() -> expected.forEach((user, word) ->
101                                                 assertThat(receivedMessages.get(user)).containsExactlyElementsOf(word)));
102         }
103
104         static Stream<KeyValue<String, TestWord>> expectedMessages = Stream.of(
105                         KeyValue.pair(
106                                         "peter",
107                                         new TestWord("peter", "Hallo")),
108                         KeyValue.pair(
109                                         "peter",
110                                         new TestWord("peter", "Welt")),
111                         KeyValue.pair(
112                                         "klaus",
113                                         new TestWord("klaus", "Müsch")),
114                         KeyValue.pair(
115                                         "klaus",
116                                         new TestWord("klaus", "gäb")),
117                         KeyValue.pair(
118                                         "klaus",
119                                         new TestWord("klaus", "s")),
120                         KeyValue.pair(
121                                         "klaus",
122                                         new TestWord("klaus", "auch")),
123                         KeyValue.pair(
124                                         "peter",
125                                         new TestWord("peter", "Boäh")),
126                         KeyValue.pair(
127                                         "peter",
128                                         new TestWord("peter", "echt")),
129                         KeyValue.pair(
130                                         "peter",
131                                         new TestWord("peter", "ß")),
132                         KeyValue.pair(
133                                         "peter",
134                                         new TestWord("peter", "mal")),
135                         KeyValue.pair(
136                                         "peter",
137                                         new TestWord("peter", "nä")),
138                         KeyValue.pair(
139                                         "peter",
140                                         new TestWord("peter", "Nümmäh")));
141
142
143         static class Consumer
144         {
145                 private final MultiValueMap<String, TestWord> received = new LinkedMultiValueMap<>();
146
147                 @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
148                 public synchronized void receive(
149                                 @Header(KafkaHeaders.RECEIVED_KEY) String key,
150                                 @Payload TestWord value)
151                 {
152                         log.debug("Received message: {}={}", key, value);
153                         received.add(key, value);
154                 }
155
156                 synchronized MultiValueMap<String, TestWord> getReceivedMessages()
157                 {
158                         return received;
159                 }
160         }
161
162
163         @TestConfiguration
164         static class Configuration
165         {
166                 @Bean
167                 Consumer consumer()
168                 {
169                         return new Consumer();
170                 }
171         }
172 }