refactor: Moved exceptions into package `exceptions` - Aligned Code
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / ShardedChatHome.java
index 6d2f079..4c8b2fa 100644 (file)
@@ -1,5 +1,9 @@
-package de.juplo.kafka.chat.backend.domain;
+package de.juplo.kafka.chat.backend.persistence.inmemory;
 
+import de.juplo.kafka.chat.backend.domain.*;
+import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
+import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
+import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -13,13 +17,13 @@ import java.util.stream.Collectors;
 @Slf4j
 public class ShardedChatHome implements ChatHome
 {
-  private final ChatHome[] chatHomes;
+  private final SimpleChatHome[] chatHomes;
   private final Set<Integer> ownedShards;
   private final ShardingStrategy shardingStrategy;
 
 
   public  ShardedChatHome(
-      ChatHome[] chatHomes,
+      SimpleChatHome[] chatHomes,
       ShardingStrategy shardingStrategy)
   {
     this.chatHomes = chatHomes;
@@ -38,22 +42,62 @@ public class ShardedChatHome implements ChatHome
 
 
   @Override
-  public Mono<ChatRoom> getChatRoom(UUID id)
+  public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
+  {
+    int shard = shardingStrategy.selectShard(id);
+    return chatHomes[shard] == null
+        ? Mono.error(new ShardNotOwnedException(shard))
+        : chatHomes[shard].createChatRoom(id, name);
+  }
+
+  @Override
+  public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
   {
     int shard = selectShard(id);
-    if (chatHomes[shard] == null)
-      throw new ShardNotOwnedException(shard);
-    return chatHomes[shard].getChatRoom(id);
+    return chatHomes[shard] == null
+        ? Mono.error(new ShardNotOwnedException(shard))
+        : chatHomes[shard]
+            .getChatRoomInfo(id)
+            .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
+            ? new UnknownChatroomException(
+                id,
+                shard,
+                ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
+            : throwable);
   }
 
   @Override
-  public Flux<ChatRoom> getChatRooms()
+  public Flux<ChatRoomInfo> getChatRoomInfo()
   {
     return Flux
         .fromIterable(ownedShards)
-        .flatMap(shard -> chatHomes[shard].getChatRooms());
+        .flatMap(shard -> chatHomes[shard].getChatRoomInfo());
   }
 
+  @Override
+  public Mono<ChatRoomData> getChatRoomData(UUID id)
+  {
+    int shard = selectShard(id);
+    return chatHomes[shard] == null
+        ? Mono.error(new ShardNotOwnedException(shard))
+        : chatHomes[shard]
+            .getChatRoomData(id)
+            .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
+                ? new UnknownChatroomException(
+                id,
+                shard,
+                ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
+                : throwable);
+  }
+
+  public Flux<ChatRoomData> getChatRoomData()
+  {
+    return Flux
+        .fromIterable(ownedShards)
+        .flatMap(shard -> chatHomes[shard].getChatRoomData());
+  }
+
+
 
   private int selectShard(UUID chatroomId)
   {