@Slf4j
-public class ChatHome implements ChatHome
+public class ChatHome
{
private final ChatHomeService service;
- private final int shard;
- public ChatHome(ChatHomeService service, int shard)
- {
- log.info("Created SimpleChatHome for shard {}", shard);
- this.service = service;
- this.shard = shard;
- }
-
public ChatHome(ChatHomeService service)
{
- this(service, 0);
+ log.info("Created ChatHome with ChatHomeService {}", service);
}
- @Override
public Mono<ChatRoom> getChatRoom(UUID id)
{
return service
- .getChatRoom(shard, id)
+ .getChatRoom(id)
.switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
}
- @Override
- public Flux<ChatRoom> getChatRooms()
+ public Flux<ChatRoom> getChatRooms(int shard)
{
return service.getChatRooms(shard);
}
public interface ChatHomeService
{
- Mono<ChatRoom> getChatRoom(int shard, UUID id);
+ Mono<ChatRoom> getChatRoom(UUID id);
Flux<ChatRoom> getChatRooms(int shard);
}
import lombok.Getter;
import java.util.Arrays;
-import java.util.UUID;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.stream.Collectors;
private final int[] ownedShards;
+ public ShardNotOwnedException(
+ ChatHomeService chatHomeService,
+ ChatRoomInfo chatRoomInfo,
+ int shard,
+ Collection<Integer> ownedShards)
+ {
+ this(
+ chatHomeService,
+ chatRoomInfo,
+ shard,
+ ShardNotOwnedException.toArray(ownedShards));
+ }
+
public ShardNotOwnedException(
ChatHomeService chatHomeService,
ChatRoomInfo chatRoomInfo,
this.shard = shard;
this.ownedShards = ownedShards;
}
+
+
+ private static int[] toArray(Collection<Integer> collection)
+ {
+ int[] array = new int[collection.size()];
+ Iterator<Integer> iterator = collection.iterator();
+ for (int i = 0; iterator.hasNext(); i++)
+ array[i] = iterator.next();
+ return array;
+ }
}
package de.juplo.kafka.chat.backend.persistence.inmemory;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
+import de.juplo.kafka.chat.backend.domain.*;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
UUID id = chatRoom.getId();
int shard = shardingStrategy.selectShard(id);
if (!ownedShards.contains(shard))
- throw new IllegalStateException(
- this,
- chatRoom,
- shard,
- ownedShards.stream().toArray());
+ throw new ShardNotOwnedException(this, chatRoom, shard, ownedShards);
chatrooms[shard].put(id, chatRoom);
}
@Override
- public Mono<ChatRoom> getChatRoom(int shard, UUID id)
+ public Mono<ChatRoom> getChatRoom(UUID id)
{
- return Mono.justOrEmpty(chatrooms[shard].get(id));
+ int shard = shardingStrategy.selectShard(id);
+ if (ownedShards.contains(shard))
+ {
+ return Mono.justOrEmpty(chatrooms[shard].get(id));
+ }
+ else
+ {
+ int[] ownedShards = new int[this.ownedShards.size()];
+ Iterator<Integer> iterator = this.ownedShards.iterator();
+ for (int i = 0; iterator.hasNext(); i++)
+ {
+ ownedShards[i] = iterator.next();
+ }
+ return Mono.error(new UnknownChatroomException(
+ id,
+ shard,
+ ownedShards));
+ }
}
@Override
{
return Flux.fromStream(chatrooms[shard].values().stream());
}
-
-
- private int selectShard(UUID chatroomId)
- {
- return shardingStrategy.selectShard(chatroomId);
- }
}