Added a consumer, that received messages of multiple types
[demos/kafka/kafkahandler] / src / test / java / de / juplo / kafka / KafkahandlerApplicationTests.java
1 package de.juplo.kafka;
2
3 import org.assertj.core.api.Assertions;
4 import org.awaitility.Awaitility;
5 import org.junit.jupiter.api.Test;
6 import org.springframework.beans.factory.annotation.Autowired;
7 import org.springframework.boot.test.context.SpringBootTest;
8 import org.springframework.kafka.core.KafkaTemplate;
9 import org.springframework.kafka.test.context.EmbeddedKafka;
10
11 import java.time.Duration;
12
13 import static de.juplo.kafka.KafkahandlerApplicationTests.TOPIC;
14
15
16 @SpringBootTest(
17                 properties = {
18                                 "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer",
19                                 "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
20                                 "spring.kafka.producer.properties.spring.json.type.mapping=foo:de.juplo.kafka.Foo,bar:de.juplo.kafka.Bar"
21                 })
22 @EmbeddedKafka(
23                 bootstrapServersProperty = "spring.kafka.bootstrap-servers",
24                 topics = TOPIC)
25 class KafkahandlerApplicationTests
26 {
27         static final String TOPIC = "test";
28
29         @Autowired
30         KafkaTemplate<String, ? super Object> kafkaTemplate;
31         @Autowired
32         MultiMessageConsumer consumer;
33
34
35         @Test
36         void receiveMessages()
37         {
38                 Foo foo = new Foo();
39                 foo.setClient("peter");
40                 foo.setMessage("Hallo Welt!");
41                 kafkaTemplate.send(TOPIC, foo);
42
43                 Bar bar = new Bar();
44                 bar.setClient("ute");
45                 bar.setMessage("Greetings again!");
46                 kafkaTemplate.send(TOPIC, bar);
47
48                 Awaitility
49                                 .await("Messages received")
50                                 .atMost(Duration.ofSeconds(5))
51                                 .untilAsserted(() ->
52                                 {
53                                         Assertions
54                                                         .assertThat(consumer.getFoos().size())
55                                                         .describedAs("All send foo-messages were received")
56                                                         .isEqualTo(1);
57                                         Assertions
58                                                         .assertThat(consumer.getBars().size())
59                                                         .describedAs("All send bar-messages were received")
60                                                         .isEqualTo(1);
61                                 });
62         }
63 }