X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationConfiguration.java;h=d48c027bc09bf860465de460182cd2e74f73fe0e;hb=9511a89368c96d0b5f09d55adaaed5515c578dcc;hp=b077a90efc137edc525dc5d57ccc7174b00c5091;hpb=2d84eda74475aaffff11ddfebe56d309b9aff2e9;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index b077a90..d48c027 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -18,12 +18,20 @@ import java.util.concurrent.Executors; public class ApplicationConfiguration { @Bean - public WordcountRecordHandler wordcountRecordHandler( + public WordcountRecordHandler wordcountRecordHandler() + { + return new WordcountRecordHandler(); + } + + @Bean + public WordcountRebalanceListener wordcountRebalanceListener( + WordcountRecordHandler wordcountRecordHandler, PartitionStatisticsRepository repository, Consumer consumer, ApplicationProperties properties) { - return new WordcountRecordHandler( + return new WordcountRebalanceListener( + wordcountRecordHandler, repository, properties.getClientId(), properties.getTopic(), @@ -36,6 +44,7 @@ public class ApplicationConfiguration public EndlessConsumer endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, + WordcountRebalanceListener wordcountRebalanceListener, WordcountRecordHandler wordcountRecordHandler, ApplicationProperties properties) { @@ -45,6 +54,7 @@ public class ApplicationConfiguration properties.getClientId(), properties.getTopic(), kafkaConsumer, + wordcountRebalanceListener, wordcountRecordHandler); }