From: Kai Moritz Date: Fri, 16 Sep 2022 18:19:53 +0000 (+0200) Subject: WIP X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=fea07fb45d8b64189972bbbf9060b1f12a438089;p=demos%2Fkafka%2Ftraining WIP --- diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 4e5457c..01fddbc 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -50,14 +50,12 @@ public class Application ApplicationRecordHandler recordHandler, AdderResults adderResults, StateRepository stateRepository, - ConsumerFactory consumerFactory, KafkaProperties kafkaProperties) { return new ApplicationRebalanceListener( recordHandler, adderResults, stateRepository, - consumerFactory.createConsumer(), kafkaProperties.getConsumer().getGroupId()); } diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index bffc146..b8677d1 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -17,13 +17,14 @@ public class ApplicationRebalanceListener implements ConsumerAwareRebalanceListe private final ApplicationRecordHandler recordHandler; private final AdderResults adderResults; private final StateRepository stateRepository; - private final Consumer consumer; private final String id; private final Set partitions = new HashSet<>(); @Override - public void onPartitionsAssigned(Collection partitions) + public void onPartitionsAssigned( + Consumer consumer, + Collection partitions) { partitions.forEach(tp -> { @@ -55,7 +56,9 @@ public class ApplicationRebalanceListener implements ConsumerAwareRebalanceListe } @Override - public void onPartitionsRevoked(Collection partitions) + public void onPartitionsRevokedAfterCommit( + Consumer consumer, + Collection partitions) { partitions.forEach(tp -> {