From: Kai Moritz Date: Sat, 3 Sep 2022 11:21:52 +0000 (+0200) Subject: Rückbau der Berücksichtigung von `auto.offset.rest`, wenn Offset unbekannt X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=4cc907defd4f6e1a52df13e7018eac44963cecdf;p=demos%2Fkafka%2Ftraining Rückbau der Berücksichtigung von `auto.offset.rest`, wenn Offset unbekannt * Hier wurde die Deaktivierung der Berücksichtigung des über `auto.offset.rest` konfigurierbaren Verhaltens der Consumer-Bibiliothek für den Fall, dass noch keine Offset-Position bekannt ist, zurückgebaut, um die einzelnen Schritte der Übung leichter nachvollziehbar zu machen. --- diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index ca6897c..da72555 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -33,17 +33,12 @@ public class ApplicationRebalanceListener implements RebalanceListener stateRepository .findById(Integer.toString(partition)) .orElse(new StateDocument(partition)); - if (document.offset >= 0) - { - // Only seek, if a stored offset was found - // Otherwise: Use initial offset, generated by Kafka - consumer.seek(tp, document.offset); - log.info( - "{} - Seeking to offset {} for partition {}", - id, - document.offset, - partition); - } + log.info( + "{} - Offset of next unseen message for partition {}: {}", + id, + partition, + document.offset); + consumer.seek(tp, document.offset); recordHandler.addPartition(partition, document.state); for (String user : document.state.keySet()) { diff --git a/src/main/java/de/juplo/kafka/StateDocument.java b/src/main/java/de/juplo/kafka/StateDocument.java index c10a50c..5c4ca22 100644 --- a/src/main/java/de/juplo/kafka/StateDocument.java +++ b/src/main/java/de/juplo/kafka/StateDocument.java @@ -15,7 +15,7 @@ public class StateDocument { @Id public String id; - public long offset = -1l; + public long offset = 0l; public Map state; public Map> results;