Triggerd bug (?) by adding a parameter of type `ConsumerRecordMetadata` fails-with-metadata
authorKai Moritz <kai@juplo.de>
Mon, 6 Jun 2022 16:04:56 +0000 (18:04 +0200)
committerKai Moritz <kai@juplo.de>
Mon, 6 Jun 2022 16:47:55 +0000 (18:47 +0200)
src/main/java/de/juplo/kafka/MultiMessageConsumer.java

index 97638bc..e6179e1 100644 (file)
@@ -4,6 +4,10 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.kafka.annotation.KafkaHandler;
 import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.handler.annotation.Payload;
 import org.springframework.stereotype.Component;
 
 import java.util.LinkedList;
@@ -21,16 +25,30 @@ public class MultiMessageConsumer
 
 
   @KafkaHandler
-  public void handleFoo(Foo foo)
+  public void handleFoo(
+      @Payload Foo foo,
+      @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata metadata)
   {
-    log.info("Received a Foo: {}", foo);
+    log.info(
+        "Received a Foo: {}, topic={}, partition={}, offset={}",
+        foo,
+        metadata.topic(),
+        metadata.partition(),
+        metadata.offset());
     foos.add(foo);
   }
 
   @KafkaHandler
-  public void handleBar(Bar bar)
+  public void handleBar(
+      @Payload Bar bar,
+      @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata metadata)
   {
-    log.info("Received a Bar: {}", bar);
+    log.info(
+        "Received a Bar: {}",
+        bar,
+        metadata.topic(),
+        metadata.partition(),
+        metadata.offset());
     bars.add(bar);
   }
 }