refactor: Handling der Partitionen in WordcountRebalanceListener
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationConfiguration.java
index b077a90..0d17823 100644 (file)
@@ -25,17 +25,31 @@ public class ApplicationConfiguration
   {
     return new WordcountRecordHandler(
         repository,
-        properties.getClientId(),
         properties.getTopic(),
         Clock.systemDefaultZone(),
         properties.getCommitInterval(),
         consumer);
   }
 
+  @Bean
+  public WordcountRebalanceListener wordcountRebalanceListener(
+      WordcountRecordHandler wordcountRecordHandler,
+      PartitionStatisticsRepository repository,
+      Consumer<String, String> consumer,
+      ApplicationProperties properties)
+  {
+    return new WordcountRebalanceListener(
+        wordcountRecordHandler,
+        repository,
+        properties.getClientId(),
+        consumer);
+  }
+
   @Bean
   public EndlessConsumer<String, String> endlessConsumer(
       KafkaConsumer<String, String> kafkaConsumer,
       ExecutorService executor,
+      WordcountRebalanceListener wordcountRebalanceListener,
       WordcountRecordHandler wordcountRecordHandler,
       ApplicationProperties properties)
   {
@@ -45,6 +59,7 @@ public class ApplicationConfiguration
             properties.getClientId(),
             properties.getTopic(),
             kafkaConsumer,
+            wordcountRebalanceListener,
             wordcountRecordHandler);
   }