X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationConfiguration.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationConfiguration.java;h=e9c26fda36f805991de2644711b206e9d52efa31;hb=refs%2Fheads%2Frebalance-listener;hp=bf00b6d7cf7e724a0d3dffa0ad95cbccf5d5b0ce;hpb=61581ed5dfbb70f66390e7c3e9c261c6e6aa74d4;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index bf00b6d..e9c26fd 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -22,10 +22,21 @@ public class ApplicationConfiguration return new ApplicationRecordHandler(); } + @Bean + public ApplicationRebalanceListener rebalanceListener( + ApplicationRecordHandler recordHandler, + ApplicationProperties properties) + { + return new ApplicationRebalanceListener( + recordHandler, + properties.getClientId()); + } + @Bean public EndlessConsumer endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, + ApplicationRebalanceListener rebalanceListener, ApplicationRecordHandler recordHandler, ApplicationProperties properties) { @@ -35,6 +46,7 @@ public class ApplicationConfiguration properties.getClientId(), properties.getTopic(), kafkaConsumer, + rebalanceListener, recordHandler); }