]> juplo.de Git - demos/kafka/training/commitdiff
Producer liest nur Partition 0, stets von Offset 0 spring-consumer--topicpartition-DEPRECATED
authorKai Moritz <kai@juplo.de>
Sun, 13 Nov 2022 16:08:36 +0000 (17:08 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 13 Nov 2022 17:03:44 +0000 (18:03 +0100)
* Der Producer benutzt jetzt `assign()`
* Da er sich beim Speichern der Offsets mit der Consumer-Group in die
  Quere kommt, muss auch die Group-ID angepasst werden!

src/main/java/de/juplo/kafka/SimpleConsumer.java
src/main/resources/application.yml

index fe0479f1a133dbc42466befb2f1fe4e056b3881b..f94f2523c7bd0f7564173b1bfc4e67bb422b31cb 100644 (file)
@@ -3,6 +3,8 @@ package de.juplo.kafka;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.annotation.PartitionOffset;
+import org.springframework.kafka.annotation.TopicPartition;
 import org.springframework.kafka.support.KafkaHeaders;
 import org.springframework.messaging.handler.annotation.Header;
 import org.springframework.messaging.handler.annotation.Payload;
@@ -17,7 +19,12 @@ public class SimpleConsumer
   private String id;
   private long consumed = 0;
 
-  @KafkaListener(topics = "${simple.consumer.topic}")
+  @KafkaListener(
+    topicPartitions = @TopicPartition(
+      topic = "${simple.consumer.topic}",
+      partitionOffsets = @PartitionOffset(
+        partition = "0",
+        initialOffset = "0")))
   private void handleRecord(
     @Header(KafkaHeaders.RECEIVED_TOPIC)
     String topic,
index 9ca7eb847220ca82dd3aa25216c71dac8ff1ec3f..7430b33f79615314365dd0f423d3d224fa45ee6a 100644 (file)
@@ -26,7 +26,7 @@ spring:
     bootstrap-servers: :9092
     client-id: DEV
     consumer:
-      group-id: my-group
+      group-id: my-reprocessing
 logging:
   level:
     root: INFO