From: Kai Moritz Date: Sun, 24 Jul 2022 16:22:00 +0000 (+0200) Subject: Wenn kein gespeicherter Offset vorliegt, auto.offset.reset von Kafka nutzen X-Git-Tag: wip-DEPRECATED~4 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=1407cf25602507e608b9ad64b84f19a8e6945896;p=demos%2Fkafka%2Ftraining Wenn kein gespeicherter Offset vorliegt, auto.offset.reset von Kafka nutzen * Es wird jetzt nur noch dann ein expliziter Seek durchgeführt, wenn eine gespeicherte Offset-Position gefunden wurde. * Andernfalls wird der von Kafka initialisierte Ausgansgs-Offset verwendet. * Welchen Offset Kafka vorgibt, hängt von `auto.offset.rest` ab! --- diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 39e7b1b..3afb4c0 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -75,7 +75,12 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl repository .findById(Integer.toString(partition)) .orElse(new StatisticsDocument(partition)); - consumer.seek(tp, document.offset); + if (document.offset >= 0) + { + // Only seek, if a stored offset was found + // Otherwise: Use initial offset, generated by Kafka + consumer.seek(tp, document.offset); + } seen.put(partition, document.statistics); }); } diff --git a/src/main/java/de/juplo/kafka/StatisticsDocument.java b/src/main/java/de/juplo/kafka/StatisticsDocument.java index 28264ec..1244f45 100644 --- a/src/main/java/de/juplo/kafka/StatisticsDocument.java +++ b/src/main/java/de/juplo/kafka/StatisticsDocument.java @@ -14,7 +14,7 @@ public class StatisticsDocument { @Id public String id; - public long offset; + public long offset = -1l; public Map statistics; public StatisticsDocument()