From 581e65e0bb9894480b4e31056d76c9cb678182ae Mon Sep 17 00:00:00 2001
From: Kai Moritz <kai@juplo.de>
Date: Mon, 6 Jun 2022 17:06:41 +0200
Subject: [PATCH] Added a consumer, that received messages of multiple types

* The consumer receives messages of type `Foo` and `Bar`.
* Added a test, that verifies, that the consumer can receive messages
  of type `Foo` and `Bar`.
---
 pom.xml                                       |  5 ++
 src/main/java/de/juplo/kafka/Bar.java         | 11 ++++
 src/main/java/de/juplo/kafka/Foo.java         | 11 ++++
 .../de/juplo/kafka/MultiMessageConsumer.java  | 36 +++++++++++++
 src/main/resources/application.properties     |  1 -
 src/main/resources/application.yml            | 14 +++++
 .../kafka/KafkahandlerApplicationTests.java   | 54 ++++++++++++++++++-
 7 files changed, 129 insertions(+), 3 deletions(-)
 create mode 100644 src/main/java/de/juplo/kafka/Bar.java
 create mode 100644 src/main/java/de/juplo/kafka/Foo.java
 create mode 100644 src/main/java/de/juplo/kafka/MultiMessageConsumer.java
 delete mode 100644 src/main/resources/application.properties
 create mode 100644 src/main/resources/application.yml

diff --git a/pom.xml b/pom.xml
index 5065249..913b352 100644
--- a/pom.xml
+++ b/pom.xml
@@ -40,6 +40,11 @@
 			<artifactId>spring-kafka-test</artifactId>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+			<groupId>org.awaitility</groupId>
+			<artifactId>awaitility</artifactId>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 
 	<build>
diff --git a/src/main/java/de/juplo/kafka/Bar.java b/src/main/java/de/juplo/kafka/Bar.java
new file mode 100644
index 0000000..40c939a
--- /dev/null
+++ b/src/main/java/de/juplo/kafka/Bar.java
@@ -0,0 +1,11 @@
+package de.juplo.kafka;
+
+import lombok.Data;
+
+
+@Data
+public class Bar
+{
+  String client;
+  String message;
+}
diff --git a/src/main/java/de/juplo/kafka/Foo.java b/src/main/java/de/juplo/kafka/Foo.java
new file mode 100644
index 0000000..ff3cbef
--- /dev/null
+++ b/src/main/java/de/juplo/kafka/Foo.java
@@ -0,0 +1,11 @@
+package de.juplo.kafka;
+
+import lombok.Data;
+
+
+@Data
+public class Foo
+{
+  String client;
+  String message;
+}
diff --git a/src/main/java/de/juplo/kafka/MultiMessageConsumer.java b/src/main/java/de/juplo/kafka/MultiMessageConsumer.java
new file mode 100644
index 0000000..97638bc
--- /dev/null
+++ b/src/main/java/de/juplo/kafka/MultiMessageConsumer.java
@@ -0,0 +1,36 @@
+package de.juplo.kafka;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.kafka.annotation.KafkaHandler;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+
+import java.util.LinkedList;
+import java.util.List;
+
+
+@Component
+@KafkaListener(topics = "test")
+@Slf4j
+@Getter
+public class MultiMessageConsumer
+{
+  private final List<Foo> foos = new LinkedList<>();
+  private final List<Bar> bars = new LinkedList<>();
+
+
+  @KafkaHandler
+  public void handleFoo(Foo foo)
+  {
+    log.info("Received a Foo: {}", foo);
+    foos.add(foo);
+  }
+
+  @KafkaHandler
+  public void handleBar(Bar bar)
+  {
+    log.info("Received a Bar: {}", bar);
+    bars.add(bar);
+  }
+}
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
deleted file mode 100644
index 8b13789..0000000
--- a/src/main/resources/application.properties
+++ /dev/null
@@ -1 +0,0 @@
-
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
new file mode 100644
index 0000000..473921e
--- /dev/null
+++ b/src/main/resources/application.yml
@@ -0,0 +1,14 @@
+spring:
+  kafka:
+    bootstrap-servers: :9092
+    consumer:
+      group-id: multi
+      auto-offset-reset: earliest
+      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
+      properties:
+        spring.json.type.mapping: "foo:de.juplo.kafka.Foo,bar:de.juplo.kafka.Bar"
+        spring.json.trusted.packages: "de.juplo.kafka"
+logging:
+  level:
+    root: INFO
+
diff --git a/src/test/java/de/juplo/kafka/KafkahandlerApplicationTests.java b/src/test/java/de/juplo/kafka/KafkahandlerApplicationTests.java
index db50a55..f808a38 100644
--- a/src/test/java/de/juplo/kafka/KafkahandlerApplicationTests.java
+++ b/src/test/java/de/juplo/kafka/KafkahandlerApplicationTests.java
@@ -1,13 +1,63 @@
 package de.juplo.kafka;
 
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.test.context.EmbeddedKafka;
 
-@SpringBootTest
+import java.time.Duration;
+
+import static de.juplo.kafka.KafkahandlerApplicationTests.TOPIC;
+
+
+@SpringBootTest(
+		properties = {
+				"spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer",
+				"spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
+				"spring.kafka.producer.properties.spring.json.type.mapping=foo:de.juplo.kafka.Foo,bar:de.juplo.kafka.Bar"
+		})
+@EmbeddedKafka(
+		bootstrapServersProperty = "spring.kafka.bootstrap-servers",
+		topics = TOPIC)
 class KafkahandlerApplicationTests
 {
+	static final String TOPIC = "test";
+
+	@Autowired
+	KafkaTemplate<String, ? super Object> kafkaTemplate;
+	@Autowired
+	MultiMessageConsumer consumer;
+
+
 	@Test
-	void contextLoads()
+	void receiveMessages()
 	{
+		Foo foo = new Foo();
+		foo.setClient("peter");
+		foo.setMessage("Hallo Welt!");
+		kafkaTemplate.send(TOPIC, foo);
+
+		Bar bar = new Bar();
+		bar.setClient("ute");
+		bar.setMessage("Greetings again!");
+		kafkaTemplate.send(TOPIC, bar);
+
+		Awaitility
+				.await("Messages received")
+				.atMost(Duration.ofSeconds(5))
+				.untilAsserted(() ->
+				{
+					Assertions
+							.assertThat(consumer.getFoos().size())
+							.describedAs("All send foo-messages were received")
+							.isEqualTo(1);
+					Assertions
+							.assertThat(consumer.getBars().size())
+							.describedAs("All send bar-messages were received")
+							.isEqualTo(1);
+				});
 	}
 }
-- 
2.20.1