From: Kai Moritz Date: Fri, 26 Aug 2022 09:31:55 +0000 (+0200) Subject: Code an die Implementierung in 'sumup-adder' angeglichen X-Git-Tag: sumup-adder---lvm-2-tage~5^2~3 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=4210993f369acfa07ae8d388eb8e32517005de6f;p=demos%2Fkafka%2Ftraining Code an die Implementierung in 'sumup-adder' angeglichen --- diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index cdd587d..64f8738 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,12 +1,12 @@ package de.juplo.kafka; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import java.time.Clock; import java.util.Optional; import java.util.Properties; import java.util.concurrent.ExecutorService; @@ -35,18 +35,18 @@ public class ApplicationConfiguration @Bean public ApplicationRebalanceListener rebalanceListener( - KafkaConsumer kafkaConsumer, ApplicationRecordHandler recordHandler, AdderResults adderResults, StateRepository stateRepository, + Consumer consumer, ApplicationProperties properties) { return new ApplicationRebalanceListener( - kafkaConsumer, recordHandler, adderResults, stateRepository, - properties.getClientId()); + properties.getClientId(), + consumer); } @Bean diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index fad3287..eef0d00 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -13,11 +13,11 @@ import java.util.*; @Slf4j public class ApplicationRebalanceListener implements ConsumerRebalanceListener { - private final Consumer consumer; private final ApplicationRecordHandler recordHandler; private final AdderResults adderResults; private final StateRepository stateRepository; private final String id; + private final Consumer consumer; private final Set partitions = new HashSet<>(); @@ -51,7 +51,14 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener public void onPartitionsRevoked(Collection partitions) { log.info("{} - Commiting offsets for all previously assigned partitions", id); - consumer.commitSync(); + try + { + consumer.commitSync(); + } + catch (Exception e) + { + log.warn("{} - Could not commit offsets in onPartitionsRevoked():", id, e); + } partitions.forEach(tp -> {