From 4cc907defd4f6e1a52df13e7018eac44963cecdf Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 3 Sep 2022 13:21:52 +0200 Subject: [PATCH] =?utf8?q?R=C3=BCckbau=20der=20Ber=C3=BCcksichtigung=20von?= =?utf8?q?=20`auto.offset.rest`,=20wenn=20Offset=20unbekannt?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Hier wurde die Deaktivierung der Berücksichtigung des über `auto.offset.rest` konfigurierbaren Verhaltens der Consumer-Bibiliothek für den Fall, dass noch keine Offset-Position bekannt ist, zurückgebaut, um die einzelnen Schritte der Übung leichter nachvollziehbar zu machen. --- .../kafka/ApplicationRebalanceListener.java | 17 ++++++----------- src/main/java/de/juplo/kafka/StateDocument.java | 2 +- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index ca6897c..da72555 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -33,17 +33,12 @@ public class ApplicationRebalanceListener implements RebalanceListener stateRepository .findById(Integer.toString(partition)) .orElse(new StateDocument(partition)); - if (document.offset >= 0) - { - // Only seek, if a stored offset was found - // Otherwise: Use initial offset, generated by Kafka - consumer.seek(tp, document.offset); - log.info( - "{} - Seeking to offset {} for partition {}", - id, - document.offset, - partition); - } + log.info( + "{} - Offset of next unseen message for partition {}: {}", + id, + partition, + document.offset); + consumer.seek(tp, document.offset); recordHandler.addPartition(partition, document.state); for (String user : document.state.keySet()) { diff --git a/src/main/java/de/juplo/kafka/StateDocument.java b/src/main/java/de/juplo/kafka/StateDocument.java index c10a50c..5c4ca22 100644 --- a/src/main/java/de/juplo/kafka/StateDocument.java +++ b/src/main/java/de/juplo/kafka/StateDocument.java @@ -15,7 +15,7 @@ public class StateDocument { @Id public String id; - public long offset = -1l; + public long offset = 0l; public Map state; public Map> results; -- 2.20.1