1 package de.juplo.kafka;
3 import lombok.extern.slf4j.Slf4j;
4 import org.springframework.beans.factory.annotation.Autowired;
5 import org.springframework.beans.factory.annotation.Value;
6 import org.springframework.kafka.annotation.KafkaListener;
7 import org.springframework.kafka.support.KafkaHeaders;
8 import org.springframework.messaging.handler.annotation.Header;
9 import org.springframework.messaging.handler.annotation.Payload;
10 import org.springframework.stereotype.Component;
15 public class SimpleConsumer
17 @Value("${spring.kafka.client-id}")
20 private MessageHandler messageHandler;
22 private long consumed = 0;
25 @KafkaListener(topics = "${simple.consumer.topic}")
26 private void handleRecord(
27 @Header(KafkaHeaders.RECEIVED_TOPIC)
29 @Header(KafkaHeaders.RECEIVED_PARTITION_ID)
31 @Header(KafkaHeaders.OFFSET)
33 @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)
39 log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value);
40 messageHandler.handle(key, value);