X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationConfiguration.java;h=cdd587dbf79cc565c7a3b9dbd8ede28fd01ed444;hb=12c1ce703ef76b75d995f94d1689c894dde1406a;hp=b58295f29d5a68e586955a73f7a95efaed195fa7;hpb=1c6d263c619010d23bf502c14dda45db11a2baf6;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index b58295f..cdd587d 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -7,6 +7,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.time.Clock; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -17,9 +18,13 @@ import java.util.concurrent.Executors; public class ApplicationConfiguration { @Bean - public ApplicationRecordHandler recordHandler(AdderResults adderResults) + public ApplicationRecordHandler recordHandler( + AdderResults adderResults, + ApplicationProperties properties) { - return new ApplicationRecordHandler(adderResults); + return new ApplicationRecordHandler( + adderResults, + Optional.ofNullable(properties.getThrottle())); } @Bean @@ -30,18 +35,18 @@ public class ApplicationConfiguration @Bean public ApplicationRebalanceListener rebalanceListener( + KafkaConsumer kafkaConsumer, ApplicationRecordHandler recordHandler, AdderResults adderResults, StateRepository stateRepository, ApplicationProperties properties) { return new ApplicationRebalanceListener( + kafkaConsumer, recordHandler, adderResults, stateRepository, - properties.getClientId(), - Clock.systemDefaultZone(), - properties.getCommitInterval()); + properties.getClientId()); } @Bean