From: Kai Moritz Date: Sun, 13 Nov 2022 16:08:36 +0000 (+0100) Subject: Producer liest nur Partition 0, stets von Offset 0 X-Git-Tag: spring-consumer--topicpartition-DEPRECATED X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=7a9916194eb5d6ed2066692800aa1f6d6c9f0a9b;p=demos%2Fkafka%2Ftraining Producer liest nur Partition 0, stets von Offset 0 * Der Producer benutzt jetzt `assign()` * Da er sich beim Speichern der Offsets mit der Consumer-Group in die Quere kommt, muss auch die Group-ID angepasst werden! --- diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java index fe0479f..f94f252 100644 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ b/src/main/java/de/juplo/kafka/SimpleConsumer.java @@ -3,6 +3,8 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.annotation.PartitionOffset; +import org.springframework.kafka.annotation.TopicPartition; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; @@ -17,7 +19,12 @@ public class SimpleConsumer private String id; private long consumed = 0; - @KafkaListener(topics = "${simple.consumer.topic}") + @KafkaListener( + topicPartitions = @TopicPartition( + topic = "${simple.consumer.topic}", + partitionOffsets = @PartitionOffset( + partition = "0", + initialOffset = "0"))) private void handleRecord( @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 9ca7eb8..7430b33 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -26,7 +26,7 @@ spring: bootstrap-servers: :9092 client-id: DEV consumer: - group-id: my-group + group-id: my-reprocessing logging: level: root: INFO