From a00689850e1803154fbc553ec8d36b0b8207c9cd Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 24 Feb 2023 09:47:35 +0100 Subject: [PATCH] WIP --- .../kafka/chat/backend/domain/ChatHome.java | 37 ++++++++++++++-- .../chat/backend/domain/ShardedChatHome.java | 20 ++++++++- .../chat/backend/domain/SimpleChatHome.java | 43 ------------------- .../domain/UnknownChatroomException.java | 26 +++++++++++ .../inmemory/InMemoryChatHomeService.java | 9 ++++ .../InMemoryServicesConfiguration.java | 7 ++- ...pleChatHomeTest.java => ChatHomeTest.java} | 6 +-- .../AbstractStorageStrategyIT.java | 2 +- 8 files changed, 94 insertions(+), 56 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java rename src/test/java/de/juplo/kafka/chat/backend/domain/{SimpleChatHomeTest.java => ChatHomeTest.java} (89%) diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java index 6091c0c5..955369d7 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java @@ -1,14 +1,43 @@ package de.juplo.kafka.chat.backend.domain; +import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.util.UUID; +import java.util.*; -public interface ChatHome +@Slf4j +public class ChatHome implements ChatHome { - Mono getChatRoom(UUID id); + private final ChatHomeService service; + private final int shard; - Flux getChatRooms(); + + public ChatHome(ChatHomeService service, int shard) + { + log.info("Created SimpleChatHome for shard {}", shard); + this.service = service; + this.shard = shard; + } + + public ChatHome(ChatHomeService service) + { + this(service, 0); + } + + + @Override + public Mono getChatRoom(UUID id) + { + return service + .getChatRoom(shard, id) + .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); + } + + @Override + public Flux getChatRooms() + { + return service.getChatRooms(shard); + } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java index 4b8c7f16..3dc66682 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java @@ -5,6 +5,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.HashSet; +import java.util.Iterator; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; @@ -40,7 +41,24 @@ public class ShardedChatHome implements ChatHome @Override public Mono getChatRoom(UUID id) { - return chatHomes[selectShard(id)].getChatRoom(id); + int shard = selectShard(id); + if (ownedShards.contains(shard)) + { + return chatHomes[shard].getChatRoom(id); + } + else + { + int[] ownedShards = new int[this.ownedShards.size()]; + Iterator iterator = this.ownedShards.iterator(); + for (int i = 0; iterator.hasNext(); i++) + { + ownedShards[i] = iterator.next(); + } + return Mono.error(new UnknownChatroomException( + id, + shard, + ownedShards)); + } } @Override diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java deleted file mode 100644 index 11542edd..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java +++ /dev/null @@ -1,43 +0,0 @@ -package de.juplo.kafka.chat.backend.domain; - -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.util.*; - - -@Slf4j -public class SimpleChatHome implements ChatHome -{ - private final ChatHomeService service; - private final int shard; - - - public SimpleChatHome(ChatHomeService service, int shard) - { - log.info("Created SimpleChatHome for shard {}", shard); - this.service = service; - this.shard = shard; - } - - public SimpleChatHome(ChatHomeService service) - { - this(service, 0); - } - - - @Override - public Mono getChatRoom(UUID id) - { - return service - .getChatRoom(shard, id) - .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); - } - - @Override - public Flux getChatRooms() - { - return service.getChatRooms(shard); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/UnknownChatroomException.java b/src/main/java/de/juplo/kafka/chat/backend/domain/UnknownChatroomException.java index 1f70f110..20208e2e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/UnknownChatroomException.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/UnknownChatroomException.java @@ -2,17 +2,43 @@ package de.juplo.kafka.chat.backend.domain; import lombok.Getter; +import java.util.Arrays; +import java.util.Optional; import java.util.UUID; +import java.util.stream.Collectors; public class UnknownChatroomException extends RuntimeException { @Getter private final UUID chatroomId; + @Getter + private final Optional shard; + @Getter + private final Optional ownedShards; public UnknownChatroomException(UUID chatroomId) { super("Chatroom does not exist: " + chatroomId); this.chatroomId = chatroomId; + this.shard = Optional.empty(); + this.ownedShards = Optional.empty(); + } + + public UnknownChatroomException(UUID chatroomId, int shard, int[] ownedShards) + { + super( + "Chatroom does not exist (here): " + + chatroomId + + " shard=" + + shard + + ", owned=" + + Arrays + .stream(ownedShards) + .mapToObj(ownedShard -> Integer.toString(ownedShard)) + .collect(Collectors.joining(","))); + this.chatroomId = chatroomId; + this.shard = Optional.of(shard); + this.ownedShards = Optional.of(ownedShards); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java index 8f262a0b..c33103c0 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java @@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.persistence.inmemory; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.ChatHomeService; +import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -13,6 +14,8 @@ import java.util.*; public class InMemoryChatHomeService implements ChatHomeService { private final Map[] chatrooms; + private final Set ownedShards; + private final ShardingStrategy shardingStrategy; public InMemoryChatHomeService( @@ -67,4 +70,10 @@ public class InMemoryChatHomeService implements ChatHomeService { return Flux.fromStream(chatrooms[shard].values().stream()); } + + + private int selectShard(UUID chatroomId) + { + return shardingStrategy.selectShard(chatroomId); + } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java index de504485..d8b49b37 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java @@ -6,7 +6,6 @@ import de.juplo.kafka.chat.backend.domain.ShardedChatHome; import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.domain.ChatHome; -import de.juplo.kafka.chat.backend.domain.SimpleChatHome; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; @@ -31,7 +30,7 @@ public class InMemoryServicesConfiguration matchIfMissing = true) ChatHome noneShardingChatHome(InMemoryChatHomeService chatHomeService) { - return new SimpleChatHome(chatHomeService); + return new ChatHome(chatHomeService); } @Bean @@ -45,14 +44,14 @@ public class InMemoryServicesConfiguration StorageStrategy storageStrategy) { int numShards = properties.getInmemory().getNumShards(); - SimpleChatHome[] chatHomes = new SimpleChatHome[numShards]; + ChatHome[] chatHomes = new ChatHome[numShards]; storageStrategy .read() .subscribe(chatRoom -> { int shard = chatRoom.getShard(); if (chatHomes[shard] == null) - chatHomes[shard] = new SimpleChatHome(chatHomeService, shard); + chatHomes[shard] = new ChatHome(chatHomeService, shard); }); ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); return new ShardedChatHome(chatHomes, strategy); diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java similarity index 89% rename from src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java rename to src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java index 5b53607f..8ae19fdc 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java @@ -14,7 +14,7 @@ import static org.mockito.Mockito.when; import static pl.rzrz.assertj.reactor.Assertions.assertThat; -public class SimpleChatHomeTest +public class ChatHomeTest { @Test @DisplayName("Assert chatroom is delivered, if it exists") @@ -30,7 +30,7 @@ public class SimpleChatHomeTest mock(ChatRoomService.class), 8); when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom)); - SimpleChatHome chatHome = new SimpleChatHome(chatHomeService); + ChatHome chatHome = new ChatHome(chatHomeService); // When Mono mono = chatHome.getChatRoom(chatRoom.getId()); @@ -46,7 +46,7 @@ public class SimpleChatHomeTest // Given ChatHomeService chatHomeService = mock(ChatHomeService.class); when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty()); - SimpleChatHome chatHome = new SimpleChatHome(chatHomeService); + ChatHome chatHome = new ChatHome(chatHomeService); // When Mono mono = chatHome.getChatRoom(UUID.randomUUID()); diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java index 3ce527eb..83e905b9 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java @@ -23,7 +23,7 @@ public abstract class AbstractStorageStrategyIT protected void start() { StorageStrategyITConfig config = getConfig(); - chathome = new SimpleChatHome(config.getChatHomeService()); + chathome = new ChatHome(config.getChatHomeService()); chatRoomFactory = config.getChatRoomFactory(); } -- 2.20.1