From 4a47513393bbe1aaaf8c9674ad978de5da2a9d7d Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 6 Jun 2022 18:04:56 +0200 Subject: [PATCH] Triggerd bug (?) by adding a parameter of type `ConsumerRecordMetadata` --- .../de/juplo/kafka/MultiMessageConsumer.java | 26 ++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/src/main/java/de/juplo/kafka/MultiMessageConsumer.java b/src/main/java/de/juplo/kafka/MultiMessageConsumer.java index 97638bc..e6179e1 100644 --- a/src/main/java/de/juplo/kafka/MultiMessageConsumer.java +++ b/src/main/java/de/juplo/kafka/MultiMessageConsumer.java @@ -4,6 +4,10 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.annotation.KafkaHandler; import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import java.util.LinkedList; @@ -21,16 +25,30 @@ public class MultiMessageConsumer @KafkaHandler - public void handleFoo(Foo foo) + public void handleFoo( + @Payload Foo foo, + @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata metadata) { - log.info("Received a Foo: {}", foo); + log.info( + "Received a Foo: {}, topic={}, partition={}, offset={}", + foo, + metadata.topic(), + metadata.partition(), + metadata.offset()); foos.add(foo); } @KafkaHandler - public void handleBar(Bar bar) + public void handleBar( + @Payload Bar bar, + @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata metadata) { - log.info("Received a Bar: {}", bar); + log.info( + "Received a Bar: {}", + bar, + metadata.topic(), + metadata.partition(), + metadata.offset()); bars.add(bar); } } -- 2.20.1