projects
/
demos
/
kafka
/
kafkahandler
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (parent:
581e65e
)
Triggerd bug (?) by adding a parameter of type `ConsumerRecordMetadata`
fails-with-metadata
author
Kai Moritz
<kai@juplo.de>
Mon, 6 Jun 2022 16:04:56 +0000
(18:04 +0200)
committer
Kai Moritz
<kai@juplo.de>
Mon, 6 Jun 2022 16:47:55 +0000
(18:47 +0200)
src/main/java/de/juplo/kafka/MultiMessageConsumer.java
patch
|
blob
|
history
diff --git
a/src/main/java/de/juplo/kafka/MultiMessageConsumer.java
b/src/main/java/de/juplo/kafka/MultiMessageConsumer.java
index
97638bc
..
e6179e1
100644
(file)
--- a/
src/main/java/de/juplo/kafka/MultiMessageConsumer.java
+++ b/
src/main/java/de/juplo/kafka/MultiMessageConsumer.java
@@
-4,6
+4,10
@@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.LinkedList;
import org.springframework.stereotype.Component;
import java.util.LinkedList;
@@
-21,16
+25,30
@@
public class MultiMessageConsumer
@KafkaHandler
@KafkaHandler
- public void handleFoo(Foo foo)
+ public void handleFoo(
+ @Payload Foo foo,
+ @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata metadata)
{
{
- log.info("Received a Foo: {}", foo);
+ log.info(
+ "Received a Foo: {}, topic={}, partition={}, offset={}",
+ foo,
+ metadata.topic(),
+ metadata.partition(),
+ metadata.offset());
foos.add(foo);
}
@KafkaHandler
foos.add(foo);
}
@KafkaHandler
- public void handleBar(Bar bar)
+ public void handleBar(
+ @Payload Bar bar,
+ @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata metadata)
{
{
- log.info("Received a Bar: {}", bar);
+ log.info(
+ "Received a Bar: {}",
+ bar,
+ metadata.topic(),
+ metadata.partition(),
+ metadata.offset());
bars.add(bar);
}
}
bars.add(bar);
}
}