From: Kai Moritz Date: Sun, 24 Jul 2022 16:22:00 +0000 (+0200) Subject: Wenn kein gespeicherter Offset vorliegt, auto.offset.reset von Kafka nutzen X-Git-Tag: endless-stream-consumer-DEPRECATED^2^2^2~1^2~7 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=6ded138c6b8139da2cdc13f2380b5f5a4e51cc4e;p=demos%2Fkafka%2Ftraining Wenn kein gespeicherter Offset vorliegt, auto.offset.reset von Kafka nutzen * Es wird jetzt nur noch dann ein expliziter Seek durchgeführt, wenn eine gespeicherte Offset-Position gefunden wurde. * Andernfalls wird der von Kafka initialisierte Ausgansgs-Offset verwendet. * Welchen Offset Kafka vorgibt, hängt von `auto.offset.rest` ab! --- diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 3d154c2..a93ae2c 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -75,7 +75,12 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl repository .findById(Integer.toString(partition)) .orElse(new StatisticsDocument(partition)); - consumer.seek(tp, document.offset); + if (document.offset >= 0) + { + // Only seek, if a stored offset was found + // Otherwise: Use initial offset, generated by Kafka + consumer.seek(tp, document.offset); + } seen.put(partition, document.statistics); }); } diff --git a/src/main/java/de/juplo/kafka/StatisticsDocument.java b/src/main/java/de/juplo/kafka/StatisticsDocument.java index 28264ec..1244f45 100644 --- a/src/main/java/de/juplo/kafka/StatisticsDocument.java +++ b/src/main/java/de/juplo/kafka/StatisticsDocument.java @@ -14,7 +14,7 @@ public class StatisticsDocument { @Id public String id; - public long offset; + public long offset = -1l; public Map statistics; public StatisticsDocument()