1 package de.juplo.kafka;
3 import org.assertj.core.api.Assertions;
4 import org.awaitility.Awaitility;
5 import org.junit.jupiter.api.Test;
6 import org.springframework.beans.factory.annotation.Autowired;
7 import org.springframework.boot.test.context.SpringBootTest;
8 import org.springframework.kafka.core.KafkaTemplate;
9 import org.springframework.kafka.test.context.EmbeddedKafka;
11 import java.time.Duration;
13 import static de.juplo.kafka.KafkahandlerApplicationTests.TOPIC;
18 "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer",
19 "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
20 "spring.kafka.producer.properties.spring.json.type.mapping=foo:de.juplo.kafka.Foo,bar:de.juplo.kafka.Bar"
23 bootstrapServersProperty = "spring.kafka.bootstrap-servers",
25 class KafkahandlerApplicationTests
27 static final String TOPIC = "test";
30 KafkaTemplate<String, ? super Object> kafkaTemplate;
32 MultiMessageConsumer consumer;
36 void receiveMessages()
39 foo.setClient("peter");
40 foo.setMessage("Hallo Welt!");
41 kafkaTemplate.send(TOPIC, foo);
45 bar.setMessage("Greetings again!");
46 kafkaTemplate.send(TOPIC, bar);
49 .await("Messages received")
50 .atMost(Duration.ofSeconds(5))
54 .assertThat(consumer.getFoos().size())
55 .describedAs("All send foo-messages were received")
58 .assertThat(consumer.getBars().size())
59 .describedAs("All send bar-messages were received")