--- /dev/null
+package de.juplo.kafka;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.kafka.annotation.KafkaHandler;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+
+import java.util.LinkedList;
+import java.util.List;
+
+
+@Component
+@KafkaListener(topics = "test")
+@Slf4j
+@Getter
+public class MultiMessageConsumer
+{
+ private final List<Foo> foos = new LinkedList<>();
+ private final List<Bar> bars = new LinkedList<>();
+
+
+ @KafkaHandler
+ public void handleFoo(Foo foo)
+ {
+ log.info("Received a Foo: {}", foo);
+ foos.add(foo);
+ }
+
+ @KafkaHandler
+ public void handleBar(Bar bar)
+ {
+ log.info("Received a Bar: {}", bar);
+ bars.add(bar);
+ }
+}
--- /dev/null
+spring:
+ kafka:
+ bootstrap-servers: :9092
+ consumer:
+ group-id: multi
+ auto-offset-reset: earliest
+ value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
+ properties:
+ spring.json.type.mapping: "foo:de.juplo.kafka.Foo,bar:de.juplo.kafka.Bar"
+ spring.json.trusted.packages: "de.juplo.kafka"
+logging:
+ level:
+ root: INFO
+
package de.juplo.kafka;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.test.context.EmbeddedKafka;
-@SpringBootTest
+import java.time.Duration;
+
+import static de.juplo.kafka.KafkahandlerApplicationTests.TOPIC;
+
+
+@SpringBootTest(
+ properties = {
+ "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer",
+ "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
+ "spring.kafka.producer.properties.spring.json.type.mapping=foo:de.juplo.kafka.Foo,bar:de.juplo.kafka.Bar"
+ })
+@EmbeddedKafka(
+ bootstrapServersProperty = "spring.kafka.bootstrap-servers",
+ topics = TOPIC)
class KafkahandlerApplicationTests
{
+ static final String TOPIC = "test";
+
+ @Autowired
+ KafkaTemplate<String, ? super Object> kafkaTemplate;
+ @Autowired
+ MultiMessageConsumer consumer;
+
+
@Test
- void contextLoads()
+ void receiveMessages()
{
+ Foo foo = new Foo();
+ foo.setClient("peter");
+ foo.setMessage("Hallo Welt!");
+ kafkaTemplate.send(TOPIC, foo);
+
+ Bar bar = new Bar();
+ bar.setClient("ute");
+ bar.setMessage("Greetings again!");
+ kafkaTemplate.send(TOPIC, bar);
+
+ Awaitility
+ .await("Messages received")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() ->
+ {
+ Assertions
+ .assertThat(consumer.getFoos().size())
+ .describedAs("All send foo-messages were received")
+ .isEqualTo(1);
+ Assertions
+ .assertThat(consumer.getBars().size())
+ .describedAs("All send bar-messages were received")
+ .isEqualTo(1);
+ });
}
}