- storageStrategy
- .read()
- .subscribe(chatRoom ->
- {
- int shard = shardingStrategy.selectShard(chatRoom.getId());
- if (chatHomes[shard] == null)
- chatHomes[shard] = new SimpleChatHome(chatHomeService, shard);
- });
- return new ShardedChatHome(chatHomes, shardingStrategy);
+ IntStream
+ .of(properties.getInmemory().getOwnedShards())
+ .forEach(shard -> chatHomes[shard] = new SimpleChatHome(chatHomeService, shard));
+ ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
+ return new ShardedChatHome(chatHomes, strategy);