Producer liest nur Partition 0, stets von Offset 0 spring-consumer--topicpartition
authorKai Moritz <kai@juplo.de>
Sun, 13 Nov 2022 16:08:36 +0000 (17:08 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 13 Nov 2022 17:03:44 +0000 (18:03 +0100)
* Der Producer benutzt jetzt `assign()`
* Da er sich beim Speichern der Offsets mit der Consumer-Group in die
  Quere kommt, muss auch die Group-ID angepasst werden!

src/main/java/de/juplo/kafka/SimpleConsumer.java
src/main/resources/application.yml

index fe0479f..f94f252 100644 (file)
@@ -3,6 +3,8 @@ package de.juplo.kafka;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.annotation.PartitionOffset;
+import org.springframework.kafka.annotation.TopicPartition;
 import org.springframework.kafka.support.KafkaHeaders;
 import org.springframework.messaging.handler.annotation.Header;
 import org.springframework.messaging.handler.annotation.Payload;
@@ -17,7 +19,12 @@ public class SimpleConsumer
   private String id;
   private long consumed = 0;
 
-  @KafkaListener(topics = "${simple.consumer.topic}")
+  @KafkaListener(
+    topicPartitions = @TopicPartition(
+      topic = "${simple.consumer.topic}",
+      partitionOffsets = @PartitionOffset(
+        partition = "0",
+        initialOffset = "0")))
   private void handleRecord(
     @Header(KafkaHeaders.RECEIVED_TOPIC)
     String topic,
index 9ca7eb8..7430b33 100644 (file)
@@ -26,7 +26,7 @@ spring:
     bootstrap-servers: :9092
     client-id: DEV
     consumer:
-      group-id: my-group
+      group-id: my-reprocessing
 logging:
   level:
     root: INFO