Wenn kein gespeicherter Offset vorliegt, auto.offset.reset von Kafka nutzen
authorKai Moritz <kai@juplo.de>
Sun, 24 Jul 2022 16:22:00 +0000 (18:22 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 12 Aug 2022 14:44:56 +0000 (16:44 +0200)
* Es wird jetzt nur noch dann ein expliziter Seek durchgeführt, wenn eine
  gespeicherte Offset-Position gefunden wurde.
* Andernfalls wird der von Kafka initialisierte Ausgansgs-Offset verwendet.
* Welchen Offset Kafka vorgibt, hängt von `auto.offset.rest` ab!

src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/StatisticsDocument.java

index 3d154c2..a93ae2c 100644 (file)
@@ -75,7 +75,12 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
           repository
               .findById(Integer.toString(partition))
               .orElse(new StatisticsDocument(partition));
-      consumer.seek(tp, document.offset);
+      if (document.offset >= 0)
+      {
+        // Only seek, if a stored offset was found
+        // Otherwise: Use initial offset, generated by Kafka
+        consumer.seek(tp, document.offset);
+      }
       seen.put(partition, document.statistics);
     });
   }
index 28264ec..1244f45 100644 (file)
@@ -14,7 +14,7 @@ public class StatisticsDocument
 {
   @Id
   public String id;
-  public long offset;
+  public long offset = -1l;
   public Map<String, Long> statistics;
 
   public StatisticsDocument()