]> juplo.de Git - demos/kafka/training/commitdiff
Fehler im Commit-Verhalten korrigiert: Bei Logik-Fehler, kein Commit
authorKai Moritz <kai@juplo.de>
Fri, 26 Aug 2022 11:52:48 +0000 (13:52 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 26 Aug 2022 11:59:07 +0000 (13:59 +0200)
* Die Implementierung sieht vor, dass bei einer unerwarteten Exception
  (i.d.R. ein Fehler in der Fachlogik) kein Commit durchgeführt wird.
* Ansonsten müsste in der Situation ein expliziter Seek der Offstes auf
  die Positionen der vor dem Auftreten des Fehlers verarbeiteten
  Nachrichten durchgeführt werden, damit es nicht zu einem Verlust von
  Nachrichten kommt.
* Dieses Verhalten wurde durch die Verlagerung des Commits in den
  Rebalance-Listener unterwandert, da der Commit dort auch im Falle
  einer unerwarteten Exception durchgeführt wurde.
* Als Korrektur wurde hier eine Methode eingeführt, über die der
  Commit im Rebalance in dieser Situation unterdrückt werden kann.

src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java
src/main/java/de/juplo/kafka/CommittingConsumerRebalanceListener.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/test/java/de/juplo/kafka/GenericApplicationTests.java

index eef0d00d73e07f8eda6053d3334498f439e6f7d7..d31929519e61a4a6fc18f84e4590a7295f757c4f 100644 (file)
@@ -3,7 +3,6 @@ package de.juplo.kafka;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.common.TopicPartition;
 
 import java.util.*;
@@ -11,7 +10,7 @@ import java.util.*;
 
 @RequiredArgsConstructor
 @Slf4j
-public class ApplicationRebalanceListener implements ConsumerRebalanceListener
+public class ApplicationRebalanceListener implements CommittingConsumerRebalanceListener
 {
   private final ApplicationRecordHandler recordHandler;
   private final AdderResults adderResults;
@@ -21,6 +20,8 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener
 
   private final Set<Integer> partitions = new HashSet<>();
 
+  private boolean commitsEnabled = true;
+
   @Override
   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
   {
@@ -50,14 +51,17 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener
   @Override
   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
   {
-    log.info("{} - Commiting offsets for all previously assigned partitions", id);
-    try
-    {
-      consumer.commitSync();
-    }
-    catch (Exception e)
+    if (commitsEnabled)
     {
-      log.warn("{} - Could not commit offsets in onPartitionsRevoked():", id, e);
+      log.info("{} - Commiting offsets for all previously assigned partitions", id);
+      try
+      {
+        consumer.commitSync();
+      }
+      catch (Exception e)
+      {
+        log.warn("{} - Could not commit offsets in onPartitionsRevoked():", id, e);
+      }
     }
 
     partitions.forEach(tp ->
@@ -79,4 +83,16 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener
       stateRepository.save(new StateDocument(partition, state, results));
     });
   }
+
+  @Override
+  public void enableCommits()
+  {
+    commitsEnabled = true;
+  }
+
+  @Override
+  public void disableCommits()
+  {
+    commitsEnabled = false;
+  }
 }
diff --git a/src/main/java/de/juplo/kafka/CommittingConsumerRebalanceListener.java b/src/main/java/de/juplo/kafka/CommittingConsumerRebalanceListener.java
new file mode 100644 (file)
index 0000000..8aa92c0
--- /dev/null
@@ -0,0 +1,10 @@
+package de.juplo.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+
+
+public interface CommittingConsumerRebalanceListener extends ConsumerRebalanceListener
+{
+  void enableCommits();
+  void disableCommits();
+}
index f0e74d39fd1737d59fdaa22f98815baca4c61df3..63fc10f98e4808f22f9c27f2cab828ef7f41aee9 100644 (file)
@@ -25,7 +25,7 @@ public class EndlessConsumer<K, V> implements Runnable
   private final String id;
   private final String topic;
   private final Consumer<K, V> consumer;
-  private final ConsumerRebalanceListener rebalanceListener;
+  private final CommittingConsumerRebalanceListener rebalanceListener;
   private final RecordHandler<K, V> recordHandler;
 
   private final Lock lock = new ReentrantLock();
@@ -42,6 +42,7 @@ public class EndlessConsumer<K, V> implements Runnable
     try
     {
       log.info("{} - Subscribing to topic {}", id, topic);
+      rebalanceListener.enableCommits();
       consumer.subscribe(Arrays.asList(topic), rebalanceListener);
 
       while (true)
@@ -89,7 +90,8 @@ public class EndlessConsumer<K, V> implements Runnable
     }
     catch(Exception e)
     {
-      log.error("{} - Unexpected error: {}", id, e.toString(), e);
+      log.error("{} - Unexpected error: {}, disabling commits", id, e.toString(), e);
+      rebalanceListener.disableCommits();
       shutdown(e);
     }
     finally
index 595ef89d29001fbc39a34cf1d757e8ecadc7d69f..7335770aa01b87082c1f1df860066878d44fddd6 100644 (file)
@@ -2,7 +2,6 @@ package de.juplo.kafka;
 
 import com.mongodb.client.MongoClient;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -67,7 +66,7 @@ abstract class GenericApplicationTests<K, V>
        @Autowired
        MongoProperties mongoProperties;
        @Autowired
-       ConsumerRebalanceListener rebalanceListener;
+       CommittingConsumerRebalanceListener rebalanceListener;
        @Autowired
        RecordHandler<K, V> recordHandler;
 
@@ -213,7 +212,7 @@ abstract class GenericApplicationTests<K, V>
                        Long expected = offsetsToCheck.get(tp) + 1;
                        log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected);
                        assertThat(offset)
-                                       .describedAs("Committed offset corresponds to the offset of the consumer")
+                                       .describedAs("Committed offset must be at most equal to the offset of the consumer")
                                        .isLessThanOrEqualTo(expected);
                        isOffsetBehindSeen.add(offset < expected);
                });