+++ /dev/null
-package de.juplo.kafka.chat.backend.domain;
-
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.UUID;
-import java.util.stream.Collectors;
-
-
-@Slf4j
-public class ShardedChatHome implements ChatHome
-{
- private final ChatHome[] chatHomes;
- private final Set<Integer> ownedShards;
- private final ShardingStrategy shardingStrategy;
-
-
- public ShardedChatHome(
- ChatHome[] chatHomes,
- ShardingStrategy shardingStrategy)
- {
- this.chatHomes = chatHomes;
- this.shardingStrategy = shardingStrategy;
- this.ownedShards = new HashSet<>();
- for (int shard = 0; shard < chatHomes.length; shard++)
- if(chatHomes[shard] != null)
- this.ownedShards.add(shard);
- log.info(
- "Created ShardedChatHome for shards: {}",
- ownedShards
- .stream()
- .map(String::valueOf)
- .collect(Collectors.joining(", ")));
- }
-
-
- @Override
- public Mono<ChatRoom> getChatRoom(UUID id)
- {
- int shard = selectShard(id);
- if (ownedShards.contains(shard))
- {
- return chatHomes[shard].getChatRoom(id);
- }
- else
- {
- int[] ownedShards = new int[this.ownedShards.size()];
- Iterator<Integer> iterator = this.ownedShards.iterator();
- for (int i = 0; iterator.hasNext(); i++)
- {
- ownedShards[i] = iterator.next();
- }
- return Mono.error(new UnknownChatroomException(
- id,
- shard,
- ownedShards));
- }
- }
-
- @Override
- public Flux<ChatRoom> getChatRooms()
- {
- return Flux
- .fromIterable(ownedShards)
- .flatMap(shard -> chatHomes[shard].getChatRooms());
- }
-
-
- private int selectShard(UUID chatroomId)
- {
- return shardingStrategy.selectShard(chatroomId);
- }
-}
public class InMemoryServicesConfiguration
{
@Bean
- @ConditionalOnProperty(
- prefix = "chat.backend.inmemory",
- name = "sharding-strategy",
- havingValue = "none",
- matchIfMissing = true)
- ChatHome noneShardingChatHome(InMemoryChatHomeService chatHomeService)
+ ChatHome chatHome(InMemoryChatHomeService chatHomeService)
{
return new ChatHome(chatHomeService);
}
- @Bean
- @ConditionalOnProperty(
- prefix = "chat.backend.inmemory",
- name = "sharding-strategy",
- havingValue = "kafkalike")
ChatHome kafkalikeShardingChatHome(
ChatBackendProperties properties,
InMemoryChatHomeService chatHomeService,
}
@Bean
- InMemoryChatHomeService chatHomeService(
+ @ConditionalOnProperty(
+ prefix = "chat.backend.inmemory",
+ name = "sharding-strategy",
+ havingValue = "none",
+ matchIfMissing = true)
+ InMemoryChatHomeService noneShardingChatHomeService(
+ ChatBackendProperties properties,
+ StorageStrategy storageStrategy)
+ {
+ ShardingStrategyType sharding =
+ properties.getInmemory().getShardingStrategy();
+
+ int numShards = sharding == ShardingStrategyType.none
+ ? 1
+ : properties.getInmemory().getNumShards();
+ int[] ownedShards = sharding == ShardingStrategyType.none
+ ? new int[] { 0 }
+ : properties.getInmemory().getOwnedShards();
+
+ ChatHome[] chatHomes = new ChatHome[numShards];
+ storageStrategy
+ .read()
+ .subscribe(chatRoom ->
+ {
+ int shard = chatRoom.getShard();
+ if (chatHomes[shard] == null)
+ chatHomes[shard] = new ChatHome(chatHomeService, shard);
+ });
+ ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
+
+ return new InMemoryChatHomeService(
+ numShards,
+ ownedShards,
+ shardingStrategy,
+ storageStrategy.read());
+ }
+
+ @Bean
+ @ConditionalOnProperty(
+ prefix = "chat.backend.inmemory",
+ name = "sharding-strategy",
+ havingValue = "kafkalike")
+ InMemoryChatHomeService kafkalikeShardingChatHomeService(
ChatBackendProperties properties,
StorageStrategy storageStrategy)
{
return new InMemoryChatHomeService(
numShards,
ownedShards,
+ shardingStrategy,
storageStrategy.read());
}