X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;ds=sidebyside;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FKafkaServicesConfiguration.java;h=5367fdf0841c873feda731b860add63fcf2b4a16;hb=e7150b6822c45c520db73a96785dc0a8a81f503b;hp=91115ea5d2e84997040b1462b7a9e83656e22349;hpb=df2360875c75f25798f2f55d1e77fc8d46cc31b0;p=demos%2Fkafka%2Fchat diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java index 91115ea5..5367fdf0 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -7,15 +7,26 @@ import de.juplo.kafka.chat.backend.domain.ShardedChatHome; import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.domain.SimpleChatHome; import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; -import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory; +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.Producer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.util.concurrent.ListenableFuture; import java.time.Clock; +import java.time.ZoneId; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; @ConditionalOnProperty( @@ -23,65 +34,118 @@ import java.time.Clock; name = "services", havingValue = "kafka") @Configuration +@Slf4j public class KafkaServicesConfiguration implements ApplicationRunner { - @Bean - ChatHome kafkaChatHome( - ChatBackendProperties properties, - KafkaChatHomeService chatHomeService) + @Autowired + ThreadPoolTaskExecutor taskExecutor; + @Autowired + ConfigurableApplicationContext context; + + @Autowired + ChatMessageChannel chatMessageChannel; + @Autowired + ChatRoomChannel chatRoomChannel; + + CompletableFuture chatRoomChannelConsumerJob; + CompletableFuture chatMessageChannelConsumerJob; + + + @Override + public void run(ApplicationArguments args) throws Exception { - int numShards = properties.getInmemory().getNumShards(); - SimpleChatHome[] chatHomes = new SimpleChatHome[numShards]; - for (int shard = 0; shard < numShards; shard++) - { - - } - .read() - .subscribe(chatRoom -> + log.info("Starting the consumer for the ChatRoomChannel"); + chatRoomChannelConsumerJob = taskExecutor + .submitCompletable(chatRoomChannel) + .exceptionally(e -> + { + log.error("The consumer for the ChatRoomChannel exited abnormally!", e); + return null; + }); + log.info("Starting the consumer for the ChatMessageChannel"); + chatMessageChannelConsumerJob = taskExecutor + .submitCompletable(chatMessageChannel) + .exceptionally(e -> { - int shard = chatRoom.getShard(); - if (chatHomes[shard] == null) - chatHomes[shard] = new SimpleChatHome(chatHomeService, shard); + log.error("The consumer for the ChatMessageChannel exited abnormally!", e); + return null; }); - ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); - return new ShardedChatHome(chatHomes, strategy); } + @PreDestroy + public void joinChatRoomChannelConsumerJob() + { + log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); + chatRoomChannelConsumerJob.join(); + log.info("Joined the consumer of the ChatRoomChannel"); + } + + @PreDestroy + public void joinChatMessageChannelConsumerJob() + { + log.info("Waiting for the consumer of the ChatMessageChannel to finish its work"); + chatMessageChannelConsumerJob.join(); + log.info("Joined the consumer of the ChatMessageChannel"); + } + + @Bean - KafkaChatHomeService kafkaChatHomeService(ChatBackendProperties properties) + ChatHome kafkaChatHome( + ShardingStrategy shardingStrategy, + ChatMessageChannel chatMessageChannel) + { + return new KafkaChatHome(shardingStrategy, chatMessageChannel); + } + + @Bean + KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel) { - ShardingStrategyType sharding = - properties.getInmemory().getShardingStrategy(); - int numShards = sharding == ShardingStrategyType.none - ? 1 - : properties.getInmemory().getNumShards(); - int[] ownedShards = sharding == ShardingStrategyType.none - ? new int[] { 0 } - : properties.getInmemory().getOwnedShards(); - return new InMemoryChatHomeService( - numShards, - ownedShards, - storageStrategy.read()); + return new KafkaChatRoomFactory(chatRoomChannel); } @Bean - InMemoryChatRoomFactory chatRoomFactory( - InMemoryChatHomeService service, - ShardingStrategy strategy, - Clock clock, - ChatBackendProperties properties) + ChatRoomChannel chatRoomChannel( + ChatBackendProperties properties, + Producer chatRoomChannelProducer, + Consumer chatRoomChannelConsumer, + ShardingStrategy shardingStrategy, + ChatMessageChannel chatMessageChannel, + Clock clock) { - return new InMemoryChatRoomFactory( - service, - strategy, + return new ChatRoomChannel( + properties.getKafka().getTopic(), + chatRoomChannelProducer, + chatRoomChannelConsumer, + shardingStrategy, + chatMessageChannel, clock, properties.getChatroomBufferSize()); } @Bean - ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties) + ChatMessageChannel chatMessageChannel( + ChatBackendProperties properties, + Producer chatMessageChannelProducer, + Consumer chatMessageChannelConsumer, + ZoneId zoneId) { - return new KafkaLikeShardingStrategy( + return new ChatMessageChannel( + properties.getKafka().getTopic(), + chatMessageChannelProducer, + chatMessageChannelConsumer, + zoneId, properties.getKafka().getNumPartitions()); } + + @Bean + ShardingStrategy shardingStrategy(ChatBackendProperties properties) + { + return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions()); + } + + @Bean + ZoneId zoneId() + { + return ZoneId.systemDefault(); + } }