import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.annotation.PartitionOffset;
+import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
private String id;
private long consumed = 0;
- @KafkaListener(topics = "${simple.consumer.topic}")
+ @KafkaListener(
+ topicPartitions = @TopicPartition(
+ topic = "${simple.consumer.topic}",
+ partitionOffsets = @PartitionOffset(
+ partition = "0",
+ initialOffset = "0")))
private void handleRecord(
@Header(KafkaHeaders.RECEIVED_TOPIC)
String topic,