Fehler im Commit-Verhalten korrigiert: Bei Logik-Fehler, kein Commit
authorKai Moritz <kai@juplo.de>
Fri, 26 Aug 2022 11:52:48 +0000 (13:52 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 26 Aug 2022 11:59:07 +0000 (13:59 +0200)
* Die Implementierung sieht vor, dass bei einer unerwarteten Exception
  (i.d.R. ein Fehler in der Fachlogik) kein Commit durchgeführt wird.
* Ansonsten müsste in der Situation ein expliziter Seek der Offstes auf
  die Positionen der vor dem Auftreten des Fehlers verarbeiteten
  Nachrichten durchgeführt werden, damit es nicht zu einem Verlust von
  Nachrichten kommt.
* Dieses Verhalten wurde durch die Verlagerung des Commits in den
  Rebalance-Listener unterwandert, da der Commit dort auch im Falle
  einer unerwarteten Exception durchgeführt wurde.
* Als Korrektur wurde hier eine Methode eingeführt, über die der
  Commit im Rebalance in dieser Situation unterdrückt werden kann.

src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java
src/main/java/de/juplo/kafka/CommittingConsumerRebalanceListener.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/test/java/de/juplo/kafka/GenericApplicationTests.java

index eef0d00..d319295 100644 (file)
@@ -3,7 +3,6 @@ package de.juplo.kafka;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.common.TopicPartition;
 
 import java.util.*;
@@ -11,7 +10,7 @@ import java.util.*;
 
 @RequiredArgsConstructor
 @Slf4j
-public class ApplicationRebalanceListener implements ConsumerRebalanceListener
+public class ApplicationRebalanceListener implements CommittingConsumerRebalanceListener
 {
   private final ApplicationRecordHandler recordHandler;
   private final AdderResults adderResults;
@@ -21,6 +20,8 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener
 
   private final Set<Integer> partitions = new HashSet<>();
 
+  private boolean commitsEnabled = true;
+
   @Override
   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
   {
@@ -50,14 +51,17 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener
   @Override
   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
   {
-    log.info("{} - Commiting offsets for all previously assigned partitions", id);
-    try
-    {
-      consumer.commitSync();
-    }
-    catch (Exception e)
+    if (commitsEnabled)
     {
-      log.warn("{} - Could not commit offsets in onPartitionsRevoked():", id, e);
+      log.info("{} - Commiting offsets for all previously assigned partitions", id);
+      try
+      {
+        consumer.commitSync();
+      }
+      catch (Exception e)
+      {
+        log.warn("{} - Could not commit offsets in onPartitionsRevoked():", id, e);
+      }
     }
 
     partitions.forEach(tp ->
@@ -79,4 +83,16 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener
       stateRepository.save(new StateDocument(partition, state, results));
     });
   }
+
+  @Override
+  public void enableCommits()
+  {
+    commitsEnabled = true;
+  }
+
+  @Override
+  public void disableCommits()
+  {
+    commitsEnabled = false;
+  }
 }
diff --git a/src/main/java/de/juplo/kafka/CommittingConsumerRebalanceListener.java b/src/main/java/de/juplo/kafka/CommittingConsumerRebalanceListener.java
new file mode 100644 (file)
index 0000000..8aa92c0
--- /dev/null
@@ -0,0 +1,10 @@
+package de.juplo.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+
+
+public interface CommittingConsumerRebalanceListener extends ConsumerRebalanceListener
+{
+  void enableCommits();
+  void disableCommits();
+}
index f0e74d3..63fc10f 100644 (file)
@@ -25,7 +25,7 @@ public class EndlessConsumer<K, V> implements Runnable
   private final String id;
   private final String topic;
   private final Consumer<K, V> consumer;
-  private final ConsumerRebalanceListener rebalanceListener;
+  private final CommittingConsumerRebalanceListener rebalanceListener;
   private final RecordHandler<K, V> recordHandler;
 
   private final Lock lock = new ReentrantLock();
@@ -42,6 +42,7 @@ public class EndlessConsumer<K, V> implements Runnable
     try
     {
       log.info("{} - Subscribing to topic {}", id, topic);
+      rebalanceListener.enableCommits();
       consumer.subscribe(Arrays.asList(topic), rebalanceListener);
 
       while (true)
@@ -89,7 +90,8 @@ public class EndlessConsumer<K, V> implements Runnable
     }
     catch(Exception e)
     {
-      log.error("{} - Unexpected error: {}", id, e.toString(), e);
+      log.error("{} - Unexpected error: {}, disabling commits", id, e.toString(), e);
+      rebalanceListener.disableCommits();
       shutdown(e);
     }
     finally
index 595ef89..7335770 100644 (file)
@@ -2,7 +2,6 @@ package de.juplo.kafka;
 
 import com.mongodb.client.MongoClient;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -67,7 +66,7 @@ abstract class GenericApplicationTests<K, V>
        @Autowired
        MongoProperties mongoProperties;
        @Autowired
-       ConsumerRebalanceListener rebalanceListener;
+       CommittingConsumerRebalanceListener rebalanceListener;
        @Autowired
        RecordHandler<K, V> recordHandler;
 
@@ -213,7 +212,7 @@ abstract class GenericApplicationTests<K, V>
                        Long expected = offsetsToCheck.get(tp) + 1;
                        log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected);
                        assertThat(offset)
-                                       .describedAs("Committed offset corresponds to the offset of the consumer")
+                                       .describedAs("Committed offset must be at most equal to the offset of the consumer")
                                        .isLessThanOrEqualTo(expected);
                        isOffsetBehindSeen.add(offset < expected);
                });