X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationConfiguration.java;h=cdd587dbf79cc565c7a3b9dbd8ede28fd01ed444;hb=12c1ce703ef76b75d995f94d1689c894dde1406a;hp=c1bc0192e401788d1b60886095c6546c12d450e3;hpb=2d25525ef70a90709edc48bd9542d1b08a2888a2;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index c1bc019..cdd587d 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -7,6 +7,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.time.Clock; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -17,9 +18,13 @@ import java.util.concurrent.Executors; public class ApplicationConfiguration { @Bean - public ApplicationRecordHandler recordHandler(AdderResults adderResults) + public ApplicationRecordHandler recordHandler( + AdderResults adderResults, + ApplicationProperties properties) { - return new ApplicationRecordHandler(adderResults); + return new ApplicationRecordHandler( + adderResults, + Optional.ofNullable(properties.getThrottle())); } @Bean @@ -30,12 +35,14 @@ public class ApplicationConfiguration @Bean public ApplicationRebalanceListener rebalanceListener( + KafkaConsumer kafkaConsumer, ApplicationRecordHandler recordHandler, AdderResults adderResults, StateRepository stateRepository, ApplicationProperties properties) { return new ApplicationRebalanceListener( + kafkaConsumer, recordHandler, adderResults, stateRepository,