Rückbau auf einen Consumer, der in `onPartitionsRevoked()` immer committed
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRebalanceListener.java
index fad3287..eef0d00 100644 (file)
@@ -13,11 +13,11 @@ import java.util.*;
 @Slf4j
 public class ApplicationRebalanceListener implements ConsumerRebalanceListener
 {
-  private final Consumer consumer;
   private final ApplicationRecordHandler recordHandler;
   private final AdderResults adderResults;
   private final StateRepository stateRepository;
   private final String id;
+  private final Consumer consumer;
 
   private final Set<Integer> partitions = new HashSet<>();
 
@@ -51,7 +51,14 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener
   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
   {
     log.info("{} - Commiting offsets for all previously assigned partitions", id);
-    consumer.commitSync();
+    try
+    {
+      consumer.commitSync();
+    }
+    catch (Exception e)
+    {
+      log.warn("{} - Could not commit offsets in onPartitionsRevoked():", id, e);
+    }
 
     partitions.forEach(tp ->
     {