X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRebalanceListener.java;h=eef0d00d73e07f8eda6053d3334498f439e6f7d7;hb=5805651c16e07a0710b88c2822894941f67c313e;hp=fad3287155433e696c616b248548e318864756a3;hpb=5bf8218d4a79a3b55056bffb2ed0c91c6bec8917;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index fad3287..eef0d00 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -13,11 +13,11 @@ import java.util.*; @Slf4j public class ApplicationRebalanceListener implements ConsumerRebalanceListener { - private final Consumer consumer; private final ApplicationRecordHandler recordHandler; private final AdderResults adderResults; private final StateRepository stateRepository; private final String id; + private final Consumer consumer; private final Set partitions = new HashSet<>(); @@ -51,7 +51,14 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener public void onPartitionsRevoked(Collection partitions) { log.info("{} - Commiting offsets for all previously assigned partitions", id); - consumer.commitSync(); + try + { + consumer.commitSync(); + } + catch (Exception e) + { + log.warn("{} - Could not commit offsets in onPartitionsRevoked():", id, e); + } partitions.forEach(tp -> {