X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationConfiguration.java;h=e08cff411c72f076d78e5442a40c0a6ee41e2fca;hb=5805651c16e07a0710b88c2822894941f67c313e;hp=5e1f8fbcbc99638eeb0a2aa0ecb51da8e283685a;hpb=e5c029af3690ae8ebed729af4f06296cef13fa3c;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 5e1f8fb..e08cff4 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,12 +1,13 @@ package de.juplo.kafka; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import java.time.Clock; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -17,23 +18,36 @@ import java.util.concurrent.Executors; public class ApplicationConfiguration { @Bean - public ApplicationRecordHandler recordHandler() + public ApplicationRecordHandler recordHandler( + AdderResults adderResults, + ApplicationProperties properties) + { + return new ApplicationRecordHandler( + adderResults, + Optional.ofNullable(properties.getThrottle()), + properties.getClientId()); + } + + @Bean + public AdderResults adderResults() { - return new ApplicationRecordHandler(); + return new AdderResults(); } @Bean public ApplicationRebalanceListener rebalanceListener( ApplicationRecordHandler recordHandler, + AdderResults adderResults, StateRepository stateRepository, + Consumer consumer, ApplicationProperties properties) { return new ApplicationRebalanceListener( recordHandler, + adderResults, stateRepository, properties.getClientId(), - Clock.systemDefaultZone(), - properties.getCommitInterval()); + consumer); } @Bean