From fea07fb45d8b64189972bbbf9060b1f12a438089 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 16 Sep 2022 20:19:53 +0200 Subject: [PATCH] WIP --- src/main/java/de/juplo/kafka/Application.java | 2 -- .../de/juplo/kafka/ApplicationRebalanceListener.java | 9 ++++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 4e5457c..01fddbc 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -50,14 +50,12 @@ public class Application ApplicationRecordHandler recordHandler, AdderResults adderResults, StateRepository stateRepository, - ConsumerFactory consumerFactory, KafkaProperties kafkaProperties) { return new ApplicationRebalanceListener( recordHandler, adderResults, stateRepository, - consumerFactory.createConsumer(), kafkaProperties.getConsumer().getGroupId()); } diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index bffc146..b8677d1 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -17,13 +17,14 @@ public class ApplicationRebalanceListener implements ConsumerAwareRebalanceListe private final ApplicationRecordHandler recordHandler; private final AdderResults adderResults; private final StateRepository stateRepository; - private final Consumer consumer; private final String id; private final Set partitions = new HashSet<>(); @Override - public void onPartitionsAssigned(Collection partitions) + public void onPartitionsAssigned( + Consumer consumer, + Collection partitions) { partitions.forEach(tp -> { @@ -55,7 +56,9 @@ public class ApplicationRebalanceListener implements ConsumerAwareRebalanceListe } @Override - public void onPartitionsRevoked(Collection partitions) + public void onPartitionsRevokedAfterCommit( + Consumer consumer, + Collection partitions) { partitions.forEach(tp -> { -- 2.20.1