--- /dev/null
+package de.juplo.kafka.chat.backend.domain;
+
+import lombok.Getter;
+
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+
+public class ShardNotOwnedException extends IllegalStateException
+{
+ @Getter
+ private final ChatHomeService chatHomeService;
+ @Getter
+ private final ChatRoomInfo chatRoomInfo;
+ @Getter
+ private final int shard;
+ @Getter
+ private final int[] ownedShards;
+
+
+ public ShardNotOwnedException(
+ ChatHomeService chatHomeService,
+ ChatRoomInfo chatRoomInfo,
+ int shard,
+ int[] ownedShards)
+ {
+ super(
+ chatHomeService +
+ " does not own the shard " +
+ shard +
+ " for ChatRoom " +
+ chatRoomInfo +
+ " owned shards: " +
+ Arrays
+ .stream(ownedShards)
+ .mapToObj(ownedShard -> Integer.toString(ownedShard))
+ .collect(Collectors.joining(", ")));
+ this.chatHomeService = chatHomeService;
+ this.chatRoomInfo = chatRoomInfo;
+ this.shard = shard;
+ this.ownedShards = ownedShards;
+ }
+}
public InMemoryChatHomeService(
int numShards,
int[] ownedShards,
+ ShardingStrategy shardingStrategy,
Flux<ChatRoom> chatroomFlux)
{
log.debug("Creating InMemoryChatHomeService");
+
this.chatrooms = new Map[numShards];
- Set<Integer> owned = Arrays
+
+ this.ownedShards = Arrays
.stream(ownedShards)
.collect(
() -> new HashSet<>(),
(set, i) -> set.add(i),
(a, b) -> a.addAll(b));
+
+ this.shardingStrategy = shardingStrategy;
+
for (int shard = 0; shard < numShards; shard++)
{
- chatrooms[shard] = owned.contains(shard)
+ chatrooms[shard] = this.ownedShards.contains(shard)
? new HashMap<>()
: null;
}
chatroomFlux
.filter(chatRoom ->
{
- if (owned.contains(chatRoom.getShard()))
+ if (this.ownedShards.contains(chatRoom.getShard()))
{
return true;
}
.forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
}
- public void putChatRoom(ChatRoom chatRoom)
+ void putChatRoom(ChatRoom chatRoom)
{
- chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
+ UUID id = chatRoom.getId();
+ int shard = shardingStrategy.selectShard(id);
+ if (!ownedShards.contains(shard))
+ throw new IllegalStateException(
+ this,
+ chatRoom,
+ shard,
+ ownedShards.stream().toArray());
+ chatrooms[shard].put(id, chatRoom);
}
@Override