Added a consumer, that received messages of multiple types
[demos/kafka/kafkahandler] / src / test / java / de / juplo / kafka / KafkahandlerApplicationTests.java
index db50a55..f808a38 100644 (file)
@@ -1,13 +1,63 @@
 package de.juplo.kafka;
 
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.test.context.EmbeddedKafka;
 
-@SpringBootTest
+import java.time.Duration;
+
+import static de.juplo.kafka.KafkahandlerApplicationTests.TOPIC;
+
+
+@SpringBootTest(
+               properties = {
+                               "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer",
+                               "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
+                               "spring.kafka.producer.properties.spring.json.type.mapping=foo:de.juplo.kafka.Foo,bar:de.juplo.kafka.Bar"
+               })
+@EmbeddedKafka(
+               bootstrapServersProperty = "spring.kafka.bootstrap-servers",
+               topics = TOPIC)
 class KafkahandlerApplicationTests
 {
+       static final String TOPIC = "test";
+
+       @Autowired
+       KafkaTemplate<String, ? super Object> kafkaTemplate;
+       @Autowired
+       MultiMessageConsumer consumer;
+
+
        @Test
-       void contextLoads()
+       void receiveMessages()
        {
+               Foo foo = new Foo();
+               foo.setClient("peter");
+               foo.setMessage("Hallo Welt!");
+               kafkaTemplate.send(TOPIC, foo);
+
+               Bar bar = new Bar();
+               bar.setClient("ute");
+               bar.setMessage("Greetings again!");
+               kafkaTemplate.send(TOPIC, bar);
+
+               Awaitility
+                               .await("Messages received")
+                               .atMost(Duration.ofSeconds(5))
+                               .untilAsserted(() ->
+                               {
+                                       Assertions
+                                                       .assertThat(consumer.getFoos().size())
+                                                       .describedAs("All send foo-messages were received")
+                                                       .isEqualTo(1);
+                                       Assertions
+                                                       .assertThat(consumer.getBars().size())
+                                                       .describedAs("All send bar-messages were received")
+                                                       .isEqualTo(1);
+                               });
        }
 }