fix: In `onPartitionsAssigned()` wurde der Kafka-Offset ausgegeben
authorKai Moritz <kai@juplo.de>
Sun, 14 Aug 2022 16:25:54 +0000 (18:25 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 14 Aug 2022 16:26:03 +0000 (18:26 +0200)
src/main/java/de/juplo/kafka/AdderRebalanceListener.java

index 7526929..ef595ba 100644 (file)
@@ -32,12 +32,11 @@ public class AdderRebalanceListener implements PollIntervalAwareConsumerRebalanc
     partitions.forEach(tp ->
     {
       Integer partition = tp.partition();
-      Long offset = consumer.position(tp);
-      log.info("{} - adding partition: {}, offset={}", id, partition, offset);
       StateDocument document =
           repository
               .findById(Integer.toString(partition))
               .orElse(new StateDocument(partition));
+      log.info("{} - adding partition: {}, offset={}", id, partition, document.offset);
       if (document.offset >= 0)
       {
         // Only seek, if a stored offset was found
@@ -54,15 +53,15 @@ public class AdderRebalanceListener implements PollIntervalAwareConsumerRebalanc
     partitions.forEach(tp ->
     {
       Integer partition = tp.partition();
-      Long newOffset = consumer.position(tp);
+      Long offset = consumer.position(tp);
       log.info(
           "{} - removing partition: {}, offset of next message {})",
           id,
           partition,
-          newOffset);
+          offset);
       if (commitsEnabled)
       {
-        repository.save(new StateDocument(partition, handler.removePartition(partition), newOffset));
+        repository.save(new StateDocument(partition, handler.removePartition(partition), offset));
       }
       else
       {