import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.LinkedList;
@KafkaHandler
- public void handleFoo(Foo foo)
+ public void handleFoo(
+ @Payload Foo foo,
+ @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata metadata)
{
- log.info("Received a Foo: {}", foo);
+ log.info(
+ "Received a Foo: {}, topic={}, partition={}, offset={}",
+ foo,
+ metadata.topic(),
+ metadata.partition(),
+ metadata.offset());
foos.add(foo);
}
@KafkaHandler
- public void handleBar(Bar bar)
+ public void handleBar(
+ @Payload Bar bar,
+ @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata metadata)
{
- log.info("Received a Bar: {}", bar);
+ log.info(
+ "Received a Bar: {}",
+ bar,
+ metadata.topic(),
+ metadata.partition(),
+ metadata.offset());
bars.add(bar);
}
}