From: Kai Moritz Date: Fri, 24 Feb 2023 10:55:43 +0000 (+0100) Subject: WIP X-Git-Tag: wip-sharding~5 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=e766e87d0b47709a44daa4011794900eb28de593;p=demos%2Fkafka%2Fchat WIP --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java index 6c6951ae..82f6da5b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java @@ -2,7 +2,6 @@ package de.juplo.kafka.chat.backend.persistence.inmemory; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType; -import de.juplo.kafka.chat.backend.domain.ShardedChatHome; import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.domain.ChatHome; @@ -12,6 +11,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.time.Clock; +import java.util.UUID; @ConditionalOnProperty( @@ -28,55 +28,33 @@ public class InMemoryServicesConfiguration return new ChatHome(chatHomeService); } - ChatHome kafkalikeShardingChatHome( - ChatBackendProperties properties, - InMemoryChatHomeService chatHomeService, - StorageStrategy storageStrategy) - { - int numShards = properties.getInmemory().getNumShards(); - ChatHome[] chatHomes = new ChatHome[numShards]; - storageStrategy - .read() - .subscribe(chatRoom -> - { - int shard = chatRoom.getShard(); - if (chatHomes[shard] == null) - chatHomes[shard] = new ChatHome(chatHomeService, shard); - }); - ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); - return new ShardedChatHome(chatHomes, strategy); - } - @Bean - @ConditionalOnProperty( - prefix = "chat.backend.inmemory", - name = "sharding-strategy", - havingValue = "none", - matchIfMissing = true) - InMemoryChatHomeService noneShardingChatHomeService( + InMemoryChatHomeService chatHomeService( ChatBackendProperties properties, StorageStrategy storageStrategy) { ShardingStrategyType sharding = properties.getInmemory().getShardingStrategy(); - int numShards = sharding == ShardingStrategyType.none - ? 1 - : properties.getInmemory().getNumShards(); - int[] ownedShards = sharding == ShardingStrategyType.none - ? new int[] { 0 } - : properties.getInmemory().getOwnedShards(); + int numShards; + int[] ownedShards; + ShardingStrategy shardingStrategy; - ChatHome[] chatHomes = new ChatHome[numShards]; - storageStrategy - .read() - .subscribe(chatRoom -> - { - int shard = chatRoom.getShard(); - if (chatHomes[shard] == null) - chatHomes[shard] = new ChatHome(chatHomeService, shard); - }); - ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); + switch (sharding) + { + case none: + numShards = 1; + ownedShards = new int[] { 0 }; + shardingStrategy = (UUID id) -> 0; + break; + case kafkalike: + numShards = properties.getInmemory().getNumShards(); + ownedShards = properties.getInmemory().getOwnedShards(); + shardingStrategy = new KafkaLikeShardingStrategy(numShards); + break; + default: + throw new IllegalArgumentException("Unknown sharding strategy: " + sharding); + } return new InMemoryChatHomeService( numShards, @@ -85,30 +63,6 @@ public class InMemoryServicesConfiguration storageStrategy.read()); } - @Bean - @ConditionalOnProperty( - prefix = "chat.backend.inmemory", - name = "sharding-strategy", - havingValue = "kafkalike") - InMemoryChatHomeService kafkalikeShardingChatHomeService( - ChatBackendProperties properties, - StorageStrategy storageStrategy) - { - ShardingStrategyType sharding = - properties.getInmemory().getShardingStrategy(); - int numShards = sharding == ShardingStrategyType.none - ? 1 - : properties.getInmemory().getNumShards(); - int[] ownedShards = sharding == ShardingStrategyType.none - ? new int[] { 0 } - : properties.getInmemory().getOwnedShards(); - return new InMemoryChatHomeService( - numShards, - ownedShards, - shardingStrategy, - storageStrategy.read()); - } - @Bean InMemoryChatRoomFactory chatRoomFactory( InMemoryChatHomeService service,