Code an die Implementierung in 'sumup-adder' angeglichen
authorKai Moritz <kai@juplo.de>
Fri, 26 Aug 2022 09:31:55 +0000 (11:31 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 26 Aug 2022 10:52:44 +0000 (12:52 +0200)
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java

index cdd587d..64f8738 100644 (file)
@@ -1,12 +1,12 @@
 package de.juplo.kafka;
 
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
-import java.time.Clock;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
@@ -35,18 +35,18 @@ public class ApplicationConfiguration
 
   @Bean
   public ApplicationRebalanceListener rebalanceListener(
-      KafkaConsumer<String, String> kafkaConsumer,
       ApplicationRecordHandler recordHandler,
       AdderResults adderResults,
       StateRepository stateRepository,
+      Consumer<String, String> consumer,
       ApplicationProperties properties)
   {
     return new ApplicationRebalanceListener(
-        kafkaConsumer,
         recordHandler,
         adderResults,
         stateRepository,
-        properties.getClientId());
+        properties.getClientId(),
+        consumer);
   }
 
   @Bean
index fad3287..eef0d00 100644 (file)
@@ -13,11 +13,11 @@ import java.util.*;
 @Slf4j
 public class ApplicationRebalanceListener implements ConsumerRebalanceListener
 {
-  private final Consumer consumer;
   private final ApplicationRecordHandler recordHandler;
   private final AdderResults adderResults;
   private final StateRepository stateRepository;
   private final String id;
+  private final Consumer consumer;
 
   private final Set<Integer> partitions = new HashSet<>();
 
@@ -51,7 +51,14 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener
   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
   {
     log.info("{} - Commiting offsets for all previously assigned partitions", id);
-    consumer.commitSync();
+    try
+    {
+      consumer.commitSync();
+    }
+    catch (Exception e)
+    {
+      log.warn("{} - Could not commit offsets in onPartitionsRevoked():", id, e);
+    }
 
     partitions.forEach(tp ->
     {