package de.juplo.kafka.chat.backend.persistence.inmemory;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
-import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
-import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
-import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
+import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Clock;
+import java.util.stream.IntStream;
@ConditionalOnProperty(
name = "sharding-strategy",
havingValue = "none",
matchIfMissing = true)
- ChatHome noneShardingChatHome(InMemoryChatHomeService chatHomeService)
+ ChatHome noneShardingChatHome(
+ ChatBackendProperties properties,
+ StorageStrategy storageStrategy,
+ Clock clock)
{
- return new SimpleChatHome(chatHomeService);
+ return new SimpleChatHome(
+ storageStrategy,
+ clock,
+ properties.getChatroomBufferSize());
}
@Bean
havingValue = "kafkalike")
ChatHome kafkalikeShardingChatHome(
ChatBackendProperties properties,
- InMemoryChatHomeService chatHomeService,
- StorageStrategy storageStrategy)
+ StorageStrategy storageStrategy,
+ Clock clock)
{
int numShards = properties.getInmemory().getNumShards();
- ShardingStrategy shardingStrategy = new KafkaLikeShardingStrategy(numShards);
SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
- storageStrategy
- .read()
- .subscribe(chatRoom ->
- {
- int shard = shardingStrategy.selectShard(chatRoom.getId());
- if (chatHomes[shard] == null)
- chatHomes[shard] = new SimpleChatHome(chatHomeService, shard);
- });
- return new ShardedChatHome(chatHomes, shardingStrategy);
- }
-
- @Bean
- InMemoryChatHomeService chatHomeService(
- ChatBackendProperties properties,
- ShardingStrategy shardingStrategy,
- StorageStrategy storageStrategy)
- {
- ShardingStrategyType sharding =
- properties.getInmemory().getShardingStrategy();
- int numShards = sharding == ShardingStrategyType.none
- ? 1
- : properties.getInmemory().getNumShards();
- int[] ownedShards = sharding == ShardingStrategyType.none
- ? new int[] { 0 }
- : properties.getInmemory().getOwnedShards();
- return new InMemoryChatHomeService(
- shardingStrategy,
- numShards,
- ownedShards,
- storageStrategy.read());
- }
-
- @Bean
- InMemoryChatRoomFactory chatRoomFactory(
- ShardingStrategy strategy,
- Clock clock,
- ChatBackendProperties properties)
- {
- return new InMemoryChatRoomFactory(
- strategy,
- clock,
- properties.getChatroomBufferSize());
+ IntStream
+ .of(properties.getInmemory().getOwnedShards())
+ .forEach(shard -> chatHomes[shard] = new SimpleChatHome(
+ shard,
+ storageStrategy,
+ clock,
+ properties.getChatroomBufferSize()));
+ ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
+ return new ShardedChatHome(chatHomes, strategy);
}
@ConditionalOnProperty(