X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRebalanceListener.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRebalanceListener.java;h=da725551acf71c392c8135f4e4291d01cf5273c4;hb=4cc907defd4f6e1a52df13e7018eac44963cecdf;hp=ca6897c055d60c362aae1e367e87fc626ce444af;hpb=af8ee013aeeaee41117174e9e02858112621556b;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index ca6897c..da72555 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -33,17 +33,12 @@ public class ApplicationRebalanceListener implements RebalanceListener stateRepository .findById(Integer.toString(partition)) .orElse(new StateDocument(partition)); - if (document.offset >= 0) - { - // Only seek, if a stored offset was found - // Otherwise: Use initial offset, generated by Kafka - consumer.seek(tp, document.offset); - log.info( - "{} - Seeking to offset {} for partition {}", - id, - document.offset, - partition); - } + log.info( + "{} - Offset of next unseen message for partition {}: {}", + id, + partition, + document.offset); + consumer.seek(tp, document.offset); recordHandler.addPartition(partition, document.state); for (String user : document.state.keySet()) {