From 581e65e0bb9894480b4e31056d76c9cb678182ae Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 6 Jun 2022 17:06:41 +0200 Subject: [PATCH] Added a consumer, that received messages of multiple types * The consumer receives messages of type `Foo` and `Bar`. * Added a test, that verifies, that the consumer can receive messages of type `Foo` and `Bar`. --- pom.xml | 5 ++ src/main/java/de/juplo/kafka/Bar.java | 11 ++++ src/main/java/de/juplo/kafka/Foo.java | 11 ++++ .../de/juplo/kafka/MultiMessageConsumer.java | 36 +++++++++++++ src/main/resources/application.properties | 1 - src/main/resources/application.yml | 14 +++++ .../kafka/KafkahandlerApplicationTests.java | 54 ++++++++++++++++++- 7 files changed, 129 insertions(+), 3 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/Bar.java create mode 100644 src/main/java/de/juplo/kafka/Foo.java create mode 100644 src/main/java/de/juplo/kafka/MultiMessageConsumer.java delete mode 100644 src/main/resources/application.properties create mode 100644 src/main/resources/application.yml diff --git a/pom.xml b/pom.xml index 5065249..913b352 100644 --- a/pom.xml +++ b/pom.xml @@ -40,6 +40,11 @@ spring-kafka-test test + + org.awaitility + awaitility + test + diff --git a/src/main/java/de/juplo/kafka/Bar.java b/src/main/java/de/juplo/kafka/Bar.java new file mode 100644 index 0000000..40c939a --- /dev/null +++ b/src/main/java/de/juplo/kafka/Bar.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +import lombok.Data; + + +@Data +public class Bar +{ + String client; + String message; +} diff --git a/src/main/java/de/juplo/kafka/Foo.java b/src/main/java/de/juplo/kafka/Foo.java new file mode 100644 index 0000000..ff3cbef --- /dev/null +++ b/src/main/java/de/juplo/kafka/Foo.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +import lombok.Data; + + +@Data +public class Foo +{ + String client; + String message; +} diff --git a/src/main/java/de/juplo/kafka/MultiMessageConsumer.java b/src/main/java/de/juplo/kafka/MultiMessageConsumer.java new file mode 100644 index 0000000..97638bc --- /dev/null +++ b/src/main/java/de/juplo/kafka/MultiMessageConsumer.java @@ -0,0 +1,36 @@ +package de.juplo.kafka; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.annotation.KafkaHandler; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +import java.util.LinkedList; +import java.util.List; + + +@Component +@KafkaListener(topics = "test") +@Slf4j +@Getter +public class MultiMessageConsumer +{ + private final List foos = new LinkedList<>(); + private final List bars = new LinkedList<>(); + + + @KafkaHandler + public void handleFoo(Foo foo) + { + log.info("Received a Foo: {}", foo); + foos.add(foo); + } + + @KafkaHandler + public void handleBar(Bar bar) + { + log.info("Received a Bar: {}", bar); + bars.add(bar); + } +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties deleted file mode 100644 index 8b13789..0000000 --- a/src/main/resources/application.properties +++ /dev/null @@ -1 +0,0 @@ - diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..473921e --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,14 @@ +spring: + kafka: + bootstrap-servers: :9092 + consumer: + group-id: multi + auto-offset-reset: earliest + value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + properties: + spring.json.type.mapping: "foo:de.juplo.kafka.Foo,bar:de.juplo.kafka.Bar" + spring.json.trusted.packages: "de.juplo.kafka" +logging: + level: + root: INFO + diff --git a/src/test/java/de/juplo/kafka/KafkahandlerApplicationTests.java b/src/test/java/de/juplo/kafka/KafkahandlerApplicationTests.java index db50a55..f808a38 100644 --- a/src/test/java/de/juplo/kafka/KafkahandlerApplicationTests.java +++ b/src/test/java/de/juplo/kafka/KafkahandlerApplicationTests.java @@ -1,13 +1,63 @@ package de.juplo.kafka; +import org.assertj.core.api.Assertions; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.test.context.EmbeddedKafka; -@SpringBootTest +import java.time.Duration; + +import static de.juplo.kafka.KafkahandlerApplicationTests.TOPIC; + + +@SpringBootTest( + properties = { + "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer", + "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", + "spring.kafka.producer.properties.spring.json.type.mapping=foo:de.juplo.kafka.Foo,bar:de.juplo.kafka.Bar" + }) +@EmbeddedKafka( + bootstrapServersProperty = "spring.kafka.bootstrap-servers", + topics = TOPIC) class KafkahandlerApplicationTests { + static final String TOPIC = "test"; + + @Autowired + KafkaTemplate kafkaTemplate; + @Autowired + MultiMessageConsumer consumer; + + @Test - void contextLoads() + void receiveMessages() { + Foo foo = new Foo(); + foo.setClient("peter"); + foo.setMessage("Hallo Welt!"); + kafkaTemplate.send(TOPIC, foo); + + Bar bar = new Bar(); + bar.setClient("ute"); + bar.setMessage("Greetings again!"); + kafkaTemplate.send(TOPIC, bar); + + Awaitility + .await("Messages received") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> + { + Assertions + .assertThat(consumer.getFoos().size()) + .describedAs("All send foo-messages were received") + .isEqualTo(1); + Assertions + .assertThat(consumer.getBars().size()) + .describedAs("All send bar-messages were received") + .isEqualTo(1); + }); } } -- 2.20.1