From: Kai Moritz Date: Fri, 24 Feb 2023 09:59:25 +0000 (+0100) Subject: WIP X-Git-Tag: wip-sharding~10 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=3c2cb6f5f0a06aa06d02d8725eba92e7eaa9ef85;p=demos%2Fkafka%2Fchat WIP --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java index 955369d7..c6445a03 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java @@ -8,35 +8,25 @@ import java.util.*; @Slf4j -public class ChatHome implements ChatHome +public class ChatHome { private final ChatHomeService service; - private final int shard; - public ChatHome(ChatHomeService service, int shard) - { - log.info("Created SimpleChatHome for shard {}", shard); - this.service = service; - this.shard = shard; - } - public ChatHome(ChatHomeService service) { - this(service, 0); + log.info("Created ChatHome with ChatHomeService {}", service); } - @Override public Mono getChatRoom(UUID id) { return service - .getChatRoom(shard, id) + .getChatRoom(id) .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); } - @Override - public Flux getChatRooms() + public Flux getChatRooms(int shard) { return service.getChatRooms(shard); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java index 19ff4aa4..fc5ba384 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java @@ -8,6 +8,6 @@ import java.util.UUID; public interface ChatHomeService { - Mono getChatRoom(int shard, UUID id); + Mono getChatRoom(UUID id); Flux getChatRooms(int shard); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardNotOwnedException.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardNotOwnedException.java index 17d94eb4..d467eabb 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardNotOwnedException.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardNotOwnedException.java @@ -3,7 +3,8 @@ package de.juplo.kafka.chat.backend.domain; import lombok.Getter; import java.util.Arrays; -import java.util.UUID; +import java.util.Collection; +import java.util.Iterator; import java.util.stream.Collectors; @@ -19,6 +20,19 @@ public class ShardNotOwnedException extends IllegalStateException private final int[] ownedShards; + public ShardNotOwnedException( + ChatHomeService chatHomeService, + ChatRoomInfo chatRoomInfo, + int shard, + Collection ownedShards) + { + this( + chatHomeService, + chatRoomInfo, + shard, + ShardNotOwnedException.toArray(ownedShards)); + } + public ShardNotOwnedException( ChatHomeService chatHomeService, ChatRoomInfo chatRoomInfo, @@ -41,4 +55,14 @@ public class ShardNotOwnedException extends IllegalStateException this.shard = shard; this.ownedShards = ownedShards; } + + + private static int[] toArray(Collection collection) + { + int[] array = new int[collection.size()]; + Iterator iterator = collection.iterator(); + for (int i = 0; iterator.hasNext(); i++) + array[i] = iterator.next(); + return array; + } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java index 33b63d74..5e1f7dc0 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java @@ -1,8 +1,6 @@ package de.juplo.kafka.chat.backend.persistence.inmemory; -import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.ChatHomeService; -import de.juplo.kafka.chat.backend.domain.ShardingStrategy; +import de.juplo.kafka.chat.backend.domain.*; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -65,18 +63,31 @@ public class InMemoryChatHomeService implements ChatHomeService UUID id = chatRoom.getId(); int shard = shardingStrategy.selectShard(id); if (!ownedShards.contains(shard)) - throw new IllegalStateException( - this, - chatRoom, - shard, - ownedShards.stream().toArray()); + throw new ShardNotOwnedException(this, chatRoom, shard, ownedShards); chatrooms[shard].put(id, chatRoom); } @Override - public Mono getChatRoom(int shard, UUID id) + public Mono getChatRoom(UUID id) { - return Mono.justOrEmpty(chatrooms[shard].get(id)); + int shard = shardingStrategy.selectShard(id); + if (ownedShards.contains(shard)) + { + return Mono.justOrEmpty(chatrooms[shard].get(id)); + } + else + { + int[] ownedShards = new int[this.ownedShards.size()]; + Iterator iterator = this.ownedShards.iterator(); + for (int i = 0; iterator.hasNext(); i++) + { + ownedShards[i] = iterator.next(); + } + return Mono.error(new UnknownChatroomException( + id, + shard, + ownedShards)); + } } @Override @@ -84,10 +95,4 @@ public class InMemoryChatHomeService implements ChatHomeService { return Flux.fromStream(chatrooms[shard].values().stream()); } - - - private int selectShard(UUID chatroomId) - { - return shardingStrategy.selectShard(chatroomId); - } }