From 472e5b7fe90e4fc845efeb8e30719187cc6e285f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 24 Feb 2023 11:31:07 +0100 Subject: [PATCH] WIP --- .../chat/backend/domain/ShardedChatHome.java | 77 ------------------- .../InMemoryServicesConfiguration.java | 57 +++++++++++--- 2 files changed, 45 insertions(+), 89 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java deleted file mode 100644 index 3dc66682..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java +++ /dev/null @@ -1,77 +0,0 @@ -package de.juplo.kafka.chat.backend.domain; - -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; -import java.util.UUID; -import java.util.stream.Collectors; - - -@Slf4j -public class ShardedChatHome implements ChatHome -{ - private final ChatHome[] chatHomes; - private final Set ownedShards; - private final ShardingStrategy shardingStrategy; - - - public ShardedChatHome( - ChatHome[] chatHomes, - ShardingStrategy shardingStrategy) - { - this.chatHomes = chatHomes; - this.shardingStrategy = shardingStrategy; - this.ownedShards = new HashSet<>(); - for (int shard = 0; shard < chatHomes.length; shard++) - if(chatHomes[shard] != null) - this.ownedShards.add(shard); - log.info( - "Created ShardedChatHome for shards: {}", - ownedShards - .stream() - .map(String::valueOf) - .collect(Collectors.joining(", "))); - } - - - @Override - public Mono getChatRoom(UUID id) - { - int shard = selectShard(id); - if (ownedShards.contains(shard)) - { - return chatHomes[shard].getChatRoom(id); - } - else - { - int[] ownedShards = new int[this.ownedShards.size()]; - Iterator iterator = this.ownedShards.iterator(); - for (int i = 0; iterator.hasNext(); i++) - { - ownedShards[i] = iterator.next(); - } - return Mono.error(new UnknownChatroomException( - id, - shard, - ownedShards)); - } - } - - @Override - public Flux getChatRooms() - { - return Flux - .fromIterable(ownedShards) - .flatMap(shard -> chatHomes[shard].getChatRooms()); - } - - - private int selectShard(UUID chatroomId) - { - return shardingStrategy.selectShard(chatroomId); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java index d8b49b37..6c6951ae 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java @@ -23,21 +23,11 @@ import java.time.Clock; public class InMemoryServicesConfiguration { @Bean - @ConditionalOnProperty( - prefix = "chat.backend.inmemory", - name = "sharding-strategy", - havingValue = "none", - matchIfMissing = true) - ChatHome noneShardingChatHome(InMemoryChatHomeService chatHomeService) + ChatHome chatHome(InMemoryChatHomeService chatHomeService) { return new ChatHome(chatHomeService); } - @Bean - @ConditionalOnProperty( - prefix = "chat.backend.inmemory", - name = "sharding-strategy", - havingValue = "kafkalike") ChatHome kafkalikeShardingChatHome( ChatBackendProperties properties, InMemoryChatHomeService chatHomeService, @@ -58,7 +48,49 @@ public class InMemoryServicesConfiguration } @Bean - InMemoryChatHomeService chatHomeService( + @ConditionalOnProperty( + prefix = "chat.backend.inmemory", + name = "sharding-strategy", + havingValue = "none", + matchIfMissing = true) + InMemoryChatHomeService noneShardingChatHomeService( + ChatBackendProperties properties, + StorageStrategy storageStrategy) + { + ShardingStrategyType sharding = + properties.getInmemory().getShardingStrategy(); + + int numShards = sharding == ShardingStrategyType.none + ? 1 + : properties.getInmemory().getNumShards(); + int[] ownedShards = sharding == ShardingStrategyType.none + ? new int[] { 0 } + : properties.getInmemory().getOwnedShards(); + + ChatHome[] chatHomes = new ChatHome[numShards]; + storageStrategy + .read() + .subscribe(chatRoom -> + { + int shard = chatRoom.getShard(); + if (chatHomes[shard] == null) + chatHomes[shard] = new ChatHome(chatHomeService, shard); + }); + ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); + + return new InMemoryChatHomeService( + numShards, + ownedShards, + shardingStrategy, + storageStrategy.read()); + } + + @Bean + @ConditionalOnProperty( + prefix = "chat.backend.inmemory", + name = "sharding-strategy", + havingValue = "kafkalike") + InMemoryChatHomeService kafkalikeShardingChatHomeService( ChatBackendProperties properties, StorageStrategy storageStrategy) { @@ -73,6 +105,7 @@ public class InMemoryServicesConfiguration return new InMemoryChatHomeService( numShards, ownedShards, + shardingStrategy, storageStrategy.read()); } -- 2.20.1