X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FKafkaServicesConfiguration.java;h=58e111734d40b8c20a7bd39ff710951ece8a4353;hb=900422dccb5a92fbceac34caa5e614b0d7f05ad7;hp=525c427a51b2944193f17573217018d5527eeb20;hpb=3ccd79c817eaedda7cef11d027d59e3fe1911e28;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index 525c427a..58e11173 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -39,16 +39,12 @@ import java.util.Properties; public class KafkaServicesConfiguration { @Bean - ChannelTaskRunner channelTaskRunner( - ChannelTaskExecutor infoChannelTaskExecutor, - ChannelTaskExecutor dataChannelTaskExecutor) + KafkaServicesThreadPoolTaskExecutorCustomizer kafkaServicesThreadPoolTaskExecutorCustomizer() { - return new ChannelTaskRunner( - infoChannelTaskExecutor, - dataChannelTaskExecutor); + return new KafkaServicesThreadPoolTaskExecutorCustomizer(); } - @Bean + @Bean(initMethod = "executeChannelTask", destroyMethod = "join") ChannelTaskExecutor infoChannelTaskExecutor( ThreadPoolTaskExecutor taskExecutor, InfoChannel infoChannel, @@ -78,7 +74,7 @@ public class KafkaServicesConfiguration }; } - @Bean + @Bean(initMethod = "executeChannelTask", destroyMethod = "join") ChannelTaskExecutor dataChannelTaskExecutor( ThreadPoolTaskExecutor taskExecutor, DataChannel dataChannel, @@ -154,7 +150,7 @@ public class KafkaServicesConfiguration zoneId, properties.getKafka().getNumPartitions(), properties.getKafka().getPollingInterval(), - properties.getChatroomBufferSize(), + properties.getChatroomHistoryLimit(), clock, channelMediator, shardingPublisherStrategy);