WIP
authorKai Moritz <kai@juplo.de>
Fri, 16 Sep 2022 18:19:53 +0000 (20:19 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 16 Sep 2022 18:19:53 +0000 (20:19 +0200)
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java

index 4e5457c..01fddbc 100644 (file)
@@ -50,14 +50,12 @@ public class Application
       ApplicationRecordHandler recordHandler,
       AdderResults adderResults,
       StateRepository stateRepository,
-      ConsumerFactory<String, Message> consumerFactory,
       KafkaProperties kafkaProperties)
   {
     return new ApplicationRebalanceListener(
         recordHandler,
         adderResults,
         stateRepository,
-        consumerFactory.createConsumer(),
         kafkaProperties.getConsumer().getGroupId());
   }
 
index bffc146..b8677d1 100644 (file)
@@ -17,13 +17,14 @@ public class ApplicationRebalanceListener implements ConsumerAwareRebalanceListe
   private final ApplicationRecordHandler recordHandler;
   private final AdderResults adderResults;
   private final StateRepository stateRepository;
-  private final Consumer consumer;
   private final String id;
 
   private final Set<Integer> partitions = new HashSet<>();
 
   @Override
-  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+  public void onPartitionsAssigned(
+    Consumer<? , ?> consumer,
+    Collection<TopicPartition> partitions)
   {
     partitions.forEach(tp ->
     {
@@ -55,7 +56,9 @@ public class ApplicationRebalanceListener implements ConsumerAwareRebalanceListe
   }
 
   @Override
-  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+  public void onPartitionsRevokedAfterCommit(
+    Consumer<?, ?> consumer,
+    Collection<TopicPartition> partitions)
   {
     partitions.forEach(tp ->
     {