+ return new SimpleChatHome(chatHomeService);
+ }
+
+ @Bean
+ @ConditionalOnProperty(
+ prefix = "chat.backend.inmemory",
+ name = "sharding-strategy",
+ havingValue = "kafkalike")
+ ChatHome kafkalikeShardingChatHome(
+ ChatBackendProperties properties,
+ InMemoryChatHomeService chatHomeService,
+ StorageStrategy storageStrategy)
+ {
+ int numShards = properties.getInmemory().getNumShards();
+ ShardingStrategy shardingStrategy = new KafkaLikeShardingStrategy(numShards);
+ SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
+ storageStrategy
+ .read()
+ .subscribe(chatRoom ->
+ {
+ int shard = shardingStrategy.selectShard(chatRoom.getId());
+ if (chatHomes[shard] == null)
+ chatHomes[shard] = new SimpleChatHome(chatHomeService, shard);
+ });
+ return new ShardedChatHome(chatHomes, shardingStrategy);
+ }
+
+ @Bean
+ InMemoryChatHomeService chatHomeService(
+ ChatBackendProperties properties,
+ ShardingStrategy shardingStrategy,
+ StorageStrategy storageStrategy)
+ {
+ ShardingStrategyType sharding =
+ properties.getInmemory().getShardingStrategy();
+ int numShards = sharding == ShardingStrategyType.none
+ ? 1
+ : properties.getInmemory().getNumShards();
+ int[] ownedShards = sharding == ShardingStrategyType.none
+ ? new int[] { 0 }
+ : properties.getInmemory().getOwnedShards();
+ return new InMemoryChatHomeService(
+ shardingStrategy,
+ numShards,
+ ownedShards,
+ storageStrategy.read());