import de.juplo.kafka.chat.backend.ChatBackendProperties;
import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
-import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import de.juplo.kafka.chat.backend.domain.ChatHome;
import org.springframework.context.annotation.Configuration;
import java.time.Clock;
+import java.util.UUID;
@ConditionalOnProperty(
return new ChatHome(chatHomeService);
}
- ChatHome kafkalikeShardingChatHome(
- ChatBackendProperties properties,
- InMemoryChatHomeService chatHomeService,
- StorageStrategy storageStrategy)
- {
- int numShards = properties.getInmemory().getNumShards();
- ChatHome[] chatHomes = new ChatHome[numShards];
- storageStrategy
- .read()
- .subscribe(chatRoom ->
- {
- int shard = chatRoom.getShard();
- if (chatHomes[shard] == null)
- chatHomes[shard] = new ChatHome(chatHomeService, shard);
- });
- ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
- return new ShardedChatHome(chatHomes, strategy);
- }
-
@Bean
- @ConditionalOnProperty(
- prefix = "chat.backend.inmemory",
- name = "sharding-strategy",
- havingValue = "none",
- matchIfMissing = true)
- InMemoryChatHomeService noneShardingChatHomeService(
+ InMemoryChatHomeService chatHomeService(
ChatBackendProperties properties,
StorageStrategy storageStrategy)
{
ShardingStrategyType sharding =
properties.getInmemory().getShardingStrategy();
- int numShards = sharding == ShardingStrategyType.none
- ? 1
- : properties.getInmemory().getNumShards();
- int[] ownedShards = sharding == ShardingStrategyType.none
- ? new int[] { 0 }
- : properties.getInmemory().getOwnedShards();
+ int numShards;
+ int[] ownedShards;
+ ShardingStrategy shardingStrategy;
- ChatHome[] chatHomes = new ChatHome[numShards];
- storageStrategy
- .read()
- .subscribe(chatRoom ->
- {
- int shard = chatRoom.getShard();
- if (chatHomes[shard] == null)
- chatHomes[shard] = new ChatHome(chatHomeService, shard);
- });
- ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
+ switch (sharding)
+ {
+ case none:
+ numShards = 1;
+ ownedShards = new int[] { 0 };
+ shardingStrategy = (UUID id) -> 0;
+ break;
+ case kafkalike:
+ numShards = properties.getInmemory().getNumShards();
+ ownedShards = properties.getInmemory().getOwnedShards();
+ shardingStrategy = new KafkaLikeShardingStrategy(numShards);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown sharding strategy: " + sharding);
+ }
return new InMemoryChatHomeService(
numShards,
storageStrategy.read());
}
- @Bean
- @ConditionalOnProperty(
- prefix = "chat.backend.inmemory",
- name = "sharding-strategy",
- havingValue = "kafkalike")
- InMemoryChatHomeService kafkalikeShardingChatHomeService(
- ChatBackendProperties properties,
- StorageStrategy storageStrategy)
- {
- ShardingStrategyType sharding =
- properties.getInmemory().getShardingStrategy();
- int numShards = sharding == ShardingStrategyType.none
- ? 1
- : properties.getInmemory().getNumShards();
- int[] ownedShards = sharding == ShardingStrategyType.none
- ? new int[] { 0 }
- : properties.getInmemory().getOwnedShards();
- return new InMemoryChatHomeService(
- numShards,
- ownedShards,
- shardingStrategy,
- storageStrategy.read());
- }
-
@Bean
InMemoryChatRoomFactory chatRoomFactory(
InMemoryChatHomeService service,