import de.juplo.kafka.chat.backend.ChatBackendProperties;
import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Clock;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Clock;
havingValue = "kafkalike")
ChatHome kafkalikeShardingChatHome(
ChatBackendProperties properties,
havingValue = "kafkalike")
ChatHome kafkalikeShardingChatHome(
ChatBackendProperties properties,
{
int numShards = properties.getInmemory().getNumShards();
SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
{
int numShards = properties.getInmemory().getNumShards();
SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
- storageStrategy
- .read()
- .subscribe(chatRoom ->
- {
- int shard = chatRoom.getShard();
- if (chatHomes[shard] == null)
- chatHomes[shard] = new SimpleChatHome(chatHomeService, shard);
- });
+ IntStream
+ .of(properties.getInmemory().getOwnedShards())
+ .forEach(shard -> chatHomes[shard] = new SimpleChatHome(chatHomeService, shard));
ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
return new ShardedChatHome(chatHomes, strategy);
}
ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
return new ShardedChatHome(chatHomes, strategy);
}