1 package de.juplo.kafka.wordcount.splitter;
3 import de.juplo.kafka.wordcount.counter.TestWord;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.streams.KeyValue;
6 import org.junit.jupiter.api.BeforeEach;
7 import org.junit.jupiter.api.Test;
8 import org.springframework.beans.factory.annotation.Autowired;
9 import org.springframework.boot.test.context.SpringBootTest;
10 import org.springframework.boot.test.context.TestConfiguration;
11 import org.springframework.context.annotation.Bean;
12 import org.springframework.kafka.annotation.KafkaListener;
13 import org.springframework.kafka.core.KafkaTemplate;
14 import org.springframework.kafka.support.KafkaHeaders;
15 import org.springframework.kafka.test.context.EmbeddedKafka;
16 import org.springframework.messaging.handler.annotation.Header;
17 import org.springframework.messaging.handler.annotation.Payload;
18 import org.springframework.util.LinkedMultiValueMap;
19 import org.springframework.util.MultiValueMap;
21 import java.time.Duration;
22 import java.util.function.BiConsumer;
23 import java.util.stream.Stream;
25 import static de.juplo.kafka.wordcount.splitter.SplitterApplicationIT.*;
26 import static org.assertj.core.api.Assertions.assertThat;
27 import static org.awaitility.Awaitility.await;
32 "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
33 "spring.kafka.producer.properties.spring.json.add.type.headers=false",
34 "spring.kafka.consumer.auto-offset-reset=earliest",
35 "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
36 "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.counter.TestWord",
37 "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.splitter",
38 "logging.level.root=WARN",
39 "logging.level.de.juplo=DEBUG",
40 "juplo.wordcount.splitter.bootstrap-server=${spring.embedded.kafka.brokers}",
41 "juplo.wordcount.splitter.input-topic=" + TOPIC_IN,
42 "juplo.wordcount.splitter.output-topic=" + TOPIC_OUT })
43 @EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT })
45 public class SplitterApplicationIT
47 public final static String TOPIC_IN = "in";
48 public final static String TOPIC_OUT = "out";
51 KafkaTemplate<String, Recording> kafkaTemplate;
59 consumer.received.clear();
62 static void writeInputData(BiConsumer<String, Recording> consumer)
66 recording = new Recording();
67 recording.setUser("peter");
68 recording.setSentence("Hallo Welt!");
69 consumer.accept(recording.getUser(), recording);
71 recording = new Recording();
72 recording.setUser("klaus");
73 recording.setSentence("Müsch gäb's auch!");
74 consumer.accept(recording.getUser(), recording);
76 recording = new Recording();
77 recording.setUser("peter");
78 recording.setSentence("Boäh, echt! ß mal nä Nümmäh!");
79 consumer.accept(recording.getUser(), recording);
83 void testSendMessage() throws Exception
85 writeInputData((user, recording) -> kafkaTemplate.send(TOPIC_IN, user, recording));
87 await("Expexted converted data")
88 .atMost(Duration.ofSeconds(5))
90 assertExpectedMessages(consumer.getReceivedMessages()));
94 static void assertExpectedMessages(MultiValueMap<String, TestWord> receivedMessages)
96 MultiValueMap<String, TestWord> expected = new LinkedMultiValueMap<>();
97 expectedMessages.forEach(keyValue -> expected.add(keyValue.key, keyValue.value));
98 await("Received expected messages")
99 .atMost(Duration.ofSeconds(5))
100 .untilAsserted(() -> expected.forEach((user, word) ->
101 assertThat(receivedMessages.get(user)).containsExactlyElementsOf(word)));
104 static Stream<KeyValue<String, TestWord>> expectedMessages = Stream.of(
107 new TestWord("peter", "Hallo")),
110 new TestWord("peter", "Welt")),
113 new TestWord("klaus", "Müsch")),
116 new TestWord("klaus", "gäb")),
119 new TestWord("klaus", "s")),
122 new TestWord("klaus", "auch")),
125 new TestWord("peter", "Boäh")),
128 new TestWord("peter", "echt")),
131 new TestWord("peter", "ß")),
134 new TestWord("peter", "mal")),
137 new TestWord("peter", "nä")),
140 new TestWord("peter", "Nümmäh")));
143 static class Consumer
145 private final MultiValueMap<String, TestWord> received = new LinkedMultiValueMap<>();
147 @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
148 public synchronized void receive(
149 @Header(KafkaHeaders.RECEIVED_KEY) String key,
150 @Payload TestWord value)
152 log.debug("Received message: {}={}", key, value);
153 received.add(key, value);
156 synchronized MultiValueMap<String, TestWord> getReceivedMessages()
164 static class Configuration
169 return new Consumer();