From 4210993f369acfa07ae8d388eb8e32517005de6f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 26 Aug 2022 11:31:55 +0200 Subject: [PATCH] Code an die Implementierung in 'sumup-adder' angeglichen --- .../java/de/juplo/kafka/ApplicationConfiguration.java | 8 ++++---- .../de/juplo/kafka/ApplicationRebalanceListener.java | 11 +++++++++-- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index cdd587d..64f8738 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,12 +1,12 @@ package de.juplo.kafka; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import java.time.Clock; import java.util.Optional; import java.util.Properties; import java.util.concurrent.ExecutorService; @@ -35,18 +35,18 @@ public class ApplicationConfiguration @Bean public ApplicationRebalanceListener rebalanceListener( - KafkaConsumer kafkaConsumer, ApplicationRecordHandler recordHandler, AdderResults adderResults, StateRepository stateRepository, + Consumer consumer, ApplicationProperties properties) { return new ApplicationRebalanceListener( - kafkaConsumer, recordHandler, adderResults, stateRepository, - properties.getClientId()); + properties.getClientId(), + consumer); } @Bean diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index fad3287..eef0d00 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -13,11 +13,11 @@ import java.util.*; @Slf4j public class ApplicationRebalanceListener implements ConsumerRebalanceListener { - private final Consumer consumer; private final ApplicationRecordHandler recordHandler; private final AdderResults adderResults; private final StateRepository stateRepository; private final String id; + private final Consumer consumer; private final Set partitions = new HashSet<>(); @@ -51,7 +51,14 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener public void onPartitionsRevoked(Collection partitions) { log.info("{} - Commiting offsets for all previously assigned partitions", id); - consumer.commitSync(); + try + { + consumer.commitSync(); + } + catch (Exception e) + { + log.warn("{} - Could not commit offsets in onPartitionsRevoked():", id, e); + } partitions.forEach(tp -> { -- 2.20.1