fix: In `onPartitionsAssigned()` wurde der Kafka-Offset ausgegeben
[demos/kafka/training] / src / main / java / de / juplo / kafka / AdderRebalanceListener.java
index 284aff5..ef595ba 100644 (file)
@@ -24,6 +24,7 @@ public class AdderRebalanceListener implements PollIntervalAwareConsumerRebalanc
   private final Consumer<String, String> consumer;
 
   private Instant lastCommit = Instant.EPOCH;
+  private boolean commitsEnabled = true;
 
   @Override
   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
@@ -31,12 +32,11 @@ public class AdderRebalanceListener implements PollIntervalAwareConsumerRebalanc
     partitions.forEach(tp ->
     {
       Integer partition = tp.partition();
-      Long offset = consumer.position(tp);
-      log.info("{} - adding partition: {}, offset={}", id, partition, offset);
       StateDocument document =
           repository
               .findById(Integer.toString(partition))
               .orElse(new StateDocument(partition));
+      log.info("{} - adding partition: {}, offset={}", id, partition, document.offset);
       if (document.offset >= 0)
       {
         // Only seek, if a stored offset was found
@@ -53,13 +53,20 @@ public class AdderRebalanceListener implements PollIntervalAwareConsumerRebalanc
     partitions.forEach(tp ->
     {
       Integer partition = tp.partition();
-      Long newOffset = consumer.position(tp);
+      Long offset = consumer.position(tp);
       log.info(
           "{} - removing partition: {}, offset of next message {})",
           id,
           partition,
-          newOffset);
-      repository.save(new StateDocument(partition, handler.removePartition(partition), newOffset));
+          offset);
+      if (commitsEnabled)
+      {
+        repository.save(new StateDocument(partition, handler.removePartition(partition), offset));
+      }
+      else
+      {
+        log.info("Offset commits are disabled! Last commit: {}", lastCommit);
+      }
     });
   }
 
@@ -67,6 +74,12 @@ public class AdderRebalanceListener implements PollIntervalAwareConsumerRebalanc
   @Override
   public void beforeNextPoll()
   {
+    if (!commitsEnabled)
+    {
+      log.info("Offset commits are disabled! Last commit: {}", lastCommit);
+      return;
+    }
+
     if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
     {
       log.debug("Storing data and offsets, last commit: {}", lastCommit);
@@ -78,4 +91,16 @@ public class AdderRebalanceListener implements PollIntervalAwareConsumerRebalanc
       lastCommit = clock.instant();
     }
   }
+
+  @Override
+  public void enableCommits()
+  {
+    commitsEnabled = true;
+  }
+
+  @Override
+  public void disableCommits()
+  {
+    commitsEnabled = false;
+  }
 }