Added a consumer, that received messages of multiple types works-with-multiple-message-types
authorKai Moritz <kai@juplo.de>
Mon, 6 Jun 2022 15:06:41 +0000 (17:06 +0200)
committerKai Moritz <kai@juplo.de>
Mon, 6 Jun 2022 16:47:52 +0000 (18:47 +0200)
* The consumer receives messages of type `Foo` and `Bar`.
* Added a test, that verifies, that the consumer can receive messages
  of type `Foo` and `Bar`.

pom.xml
src/main/java/de/juplo/kafka/Bar.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/Foo.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/MultiMessageConsumer.java [new file with mode: 0644]
src/main/resources/application.properties [deleted file]
src/main/resources/application.yml [new file with mode: 0644]
src/test/java/de/juplo/kafka/KafkahandlerApplicationTests.java

diff --git a/pom.xml b/pom.xml
index 5065249..913b352 100644 (file)
--- a/pom.xml
+++ b/pom.xml
                        <artifactId>spring-kafka-test</artifactId>
                        <scope>test</scope>
                </dependency>
+               <dependency>
+                       <groupId>org.awaitility</groupId>
+                       <artifactId>awaitility</artifactId>
+                       <scope>test</scope>
+               </dependency>
        </dependencies>
 
        <build>
diff --git a/src/main/java/de/juplo/kafka/Bar.java b/src/main/java/de/juplo/kafka/Bar.java
new file mode 100644 (file)
index 0000000..40c939a
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka;
+
+import lombok.Data;
+
+
+@Data
+public class Bar
+{
+  String client;
+  String message;
+}
diff --git a/src/main/java/de/juplo/kafka/Foo.java b/src/main/java/de/juplo/kafka/Foo.java
new file mode 100644 (file)
index 0000000..ff3cbef
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka;
+
+import lombok.Data;
+
+
+@Data
+public class Foo
+{
+  String client;
+  String message;
+}
diff --git a/src/main/java/de/juplo/kafka/MultiMessageConsumer.java b/src/main/java/de/juplo/kafka/MultiMessageConsumer.java
new file mode 100644 (file)
index 0000000..97638bc
--- /dev/null
@@ -0,0 +1,36 @@
+package de.juplo.kafka;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.kafka.annotation.KafkaHandler;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+
+import java.util.LinkedList;
+import java.util.List;
+
+
+@Component
+@KafkaListener(topics = "test")
+@Slf4j
+@Getter
+public class MultiMessageConsumer
+{
+  private final List<Foo> foos = new LinkedList<>();
+  private final List<Bar> bars = new LinkedList<>();
+
+
+  @KafkaHandler
+  public void handleFoo(Foo foo)
+  {
+    log.info("Received a Foo: {}", foo);
+    foos.add(foo);
+  }
+
+  @KafkaHandler
+  public void handleBar(Bar bar)
+  {
+    log.info("Received a Bar: {}", bar);
+    bars.add(bar);
+  }
+}
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
deleted file mode 100644 (file)
index 8b13789..0000000
+++ /dev/null
@@ -1 +0,0 @@
-
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
new file mode 100644 (file)
index 0000000..473921e
--- /dev/null
@@ -0,0 +1,14 @@
+spring:
+  kafka:
+    bootstrap-servers: :9092
+    consumer:
+      group-id: multi
+      auto-offset-reset: earliest
+      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
+      properties:
+        spring.json.type.mapping: "foo:de.juplo.kafka.Foo,bar:de.juplo.kafka.Bar"
+        spring.json.trusted.packages: "de.juplo.kafka"
+logging:
+  level:
+    root: INFO
+
index db50a55..f808a38 100644 (file)
@@ -1,13 +1,63 @@
 package de.juplo.kafka;
 
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.test.context.EmbeddedKafka;
 
-@SpringBootTest
+import java.time.Duration;
+
+import static de.juplo.kafka.KafkahandlerApplicationTests.TOPIC;
+
+
+@SpringBootTest(
+               properties = {
+                               "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer",
+                               "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
+                               "spring.kafka.producer.properties.spring.json.type.mapping=foo:de.juplo.kafka.Foo,bar:de.juplo.kafka.Bar"
+               })
+@EmbeddedKafka(
+               bootstrapServersProperty = "spring.kafka.bootstrap-servers",
+               topics = TOPIC)
 class KafkahandlerApplicationTests
 {
+       static final String TOPIC = "test";
+
+       @Autowired
+       KafkaTemplate<String, ? super Object> kafkaTemplate;
+       @Autowired
+       MultiMessageConsumer consumer;
+
+
        @Test
-       void contextLoads()
+       void receiveMessages()
        {
+               Foo foo = new Foo();
+               foo.setClient("peter");
+               foo.setMessage("Hallo Welt!");
+               kafkaTemplate.send(TOPIC, foo);
+
+               Bar bar = new Bar();
+               bar.setClient("ute");
+               bar.setMessage("Greetings again!");
+               kafkaTemplate.send(TOPIC, bar);
+
+               Awaitility
+                               .await("Messages received")
+                               .atMost(Duration.ofSeconds(5))
+                               .untilAsserted(() ->
+                               {
+                                       Assertions
+                                                       .assertThat(consumer.getFoos().size())
+                                                       .describedAs("All send foo-messages were received")
+                                                       .isEqualTo(1);
+                                       Assertions
+                                                       .assertThat(consumer.getBars().size())
+                                                       .describedAs("All send bar-messages were received")
+                                                       .isEqualTo(1);
+                               });
        }
 }