GRÜN: Korrektur des über die verschärften Tests aufgedeckten Fehlers
authorKai Moritz <kai@juplo.de>
Sun, 14 Aug 2022 16:16:34 +0000 (18:16 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 14 Aug 2022 16:16:34 +0000 (18:16 +0200)
src/main/java/de/juplo/kafka/AdderRebalanceListener.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java

index 284aff5..7526929 100644 (file)
@@ -24,6 +24,7 @@ public class AdderRebalanceListener implements PollIntervalAwareConsumerRebalanc
   private final Consumer<String, String> consumer;
 
   private Instant lastCommit = Instant.EPOCH;
+  private boolean commitsEnabled = true;
 
   @Override
   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
@@ -59,7 +60,14 @@ public class AdderRebalanceListener implements PollIntervalAwareConsumerRebalanc
           id,
           partition,
           newOffset);
-      repository.save(new StateDocument(partition, handler.removePartition(partition), newOffset));
+      if (commitsEnabled)
+      {
+        repository.save(new StateDocument(partition, handler.removePartition(partition), newOffset));
+      }
+      else
+      {
+        log.info("Offset commits are disabled! Last commit: {}", lastCommit);
+      }
     });
   }
 
@@ -67,6 +75,12 @@ public class AdderRebalanceListener implements PollIntervalAwareConsumerRebalanc
   @Override
   public void beforeNextPoll()
   {
+    if (!commitsEnabled)
+    {
+      log.info("Offset commits are disabled! Last commit: {}", lastCommit);
+      return;
+    }
+
     if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
     {
       log.debug("Storing data and offsets, last commit: {}", lastCommit);
@@ -78,4 +92,16 @@ public class AdderRebalanceListener implements PollIntervalAwareConsumerRebalanc
       lastCommit = clock.instant();
     }
   }
+
+  @Override
+  public void enableCommits()
+  {
+    commitsEnabled = true;
+  }
+
+  @Override
+  public void disableCommits()
+  {
+    commitsEnabled = false;
+  }
 }
index 58557f2..cfba6df 100644 (file)
@@ -42,6 +42,7 @@ public class EndlessConsumer<K, V> implements Runnable
     try
     {
       log.info("{} - Subscribing to topic {}", id, topic);
+      pollIntervalAwareRebalanceListener.enableCommits();
       consumer.subscribe(Arrays.asList(topic), pollIntervalAwareRebalanceListener);
 
       while (true)
@@ -91,7 +92,8 @@ public class EndlessConsumer<K, V> implements Runnable
     }
     catch(Exception e)
     {
-      log.error("{} - Unexpected error: {}", id, e.toString(), e);
+      log.error("{} - Unexpected error: {}, disabling commits", id, e.toString(), e);
+      pollIntervalAwareRebalanceListener.disableCommits();
       shutdown(e);
     }
     finally
index 8abec12..c59418c 100644 (file)
@@ -6,4 +6,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 public interface PollIntervalAwareConsumerRebalanceListener extends ConsumerRebalanceListener
 {
   default void beforeNextPoll() {}
+
+  default void enableCommits() {}
+  default void disableCommits() {}
 }