From 6ded138c6b8139da2cdc13f2380b5f5a4e51cc4e Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 24 Jul 2022 18:22:00 +0200 Subject: [PATCH] Wenn kein gespeicherter Offset vorliegt, auto.offset.reset von Kafka nutzen MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Es wird jetzt nur noch dann ein expliziter Seek durchgeführt, wenn eine gespeicherte Offset-Position gefunden wurde. * Andernfalls wird der von Kafka initialisierte Ausgansgs-Offset verwendet. * Welchen Offset Kafka vorgibt, hängt von `auto.offset.rest` ab! --- src/main/java/de/juplo/kafka/EndlessConsumer.java | 7 ++++++- src/main/java/de/juplo/kafka/StatisticsDocument.java | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 3d154c2..a93ae2c 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -75,7 +75,12 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl repository .findById(Integer.toString(partition)) .orElse(new StatisticsDocument(partition)); - consumer.seek(tp, document.offset); + if (document.offset >= 0) + { + // Only seek, if a stored offset was found + // Otherwise: Use initial offset, generated by Kafka + consumer.seek(tp, document.offset); + } seen.put(partition, document.statistics); }); } diff --git a/src/main/java/de/juplo/kafka/StatisticsDocument.java b/src/main/java/de/juplo/kafka/StatisticsDocument.java index 28264ec..1244f45 100644 --- a/src/main/java/de/juplo/kafka/StatisticsDocument.java +++ b/src/main/java/de/juplo/kafka/StatisticsDocument.java @@ -14,7 +14,7 @@ public class StatisticsDocument { @Id public String id; - public long offset; + public long offset = -1l; public Map statistics; public StatisticsDocument() -- 2.20.1