Verbesserte Tests und Korrekturen gemerged: sumup-adder -> stored-offsets
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRebalanceListener.java
index 59b420a..444b7b7 100644 (file)
@@ -25,6 +25,7 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe
   private final Consumer<String, Long> consumer;
 
   private Instant lastCommit = Instant.EPOCH;
+  private boolean commitsEnabled = true;
 
   @Override
   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
@@ -59,8 +60,15 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe
           id,
           partition,
           offset);
-      Map<String, Long> removed = recordHandler.removePartition(partition);
-      stateRepository.save(new StateDocument(partition, removed, offset));
+      if (commitsEnabled)
+      {
+        Map<String, Long> removed = recordHandler.removePartition(partition);
+        stateRepository.save(new StateDocument(partition, removed, offset));
+      }
+      else
+      {
+        log.info("Offset commits are disabled! Last commit: {}", lastCommit);
+      }
     });
   }
 
@@ -68,6 +76,12 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe
   @Override
   public void beforeNextPoll()
   {
+    if (!commitsEnabled)
+    {
+      log.info("Offset commits are disabled! Last commit: {}", lastCommit);
+      return;
+    }
+
     if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
     {
       log.debug("Storing data and offsets, last commit: {}", lastCommit);
@@ -79,4 +93,16 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe
       lastCommit = clock.instant();
     }
   }
+
+  @Override
+  public void enableCommits()
+  {
+    commitsEnabled = true;
+  }
+
+  @Override
+  public void disableCommits()
+  {
+    commitsEnabled = false;
+  }
 }