refactor: Moved exceptions into package `exceptions` - Aligned Code
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatRoomChannel.java
index 7659d1e..7e95c64 100644 (file)
@@ -1,9 +1,8 @@
 package de.juplo.kafka.chat.backend.persistence.kafka;
 
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
+import de.juplo.kafka.chat.backend.domain.*;
+import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
+import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
 import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
 import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
@@ -38,7 +37,8 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
   private final boolean[] isShardOwned;
   private final long[] currentOffset;
   private final long[] nextOffset;
-  private final Map<UUID, ChatRoom>[] chatrooms;
+  private final Map<UUID, ChatRoomInfo>[] chatRoomInfo;
+  private final Map<UUID, ChatRoomData>[] chatRoomData;
 
   private boolean running;
   @Getter
@@ -68,10 +68,15 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     this.isShardOwned = new boolean[numShards];
     this.currentOffset = new long[numShards];
     this.nextOffset = new long[numShards];
-    this.chatrooms = new Map[numShards];
+    this.chatRoomInfo = new Map[numShards];
+    this.chatRoomData = new Map[numShards];
     IntStream
         .range(0, numShards)
-        .forEach(shard -> this.chatrooms[shard] = new HashMap<>());
+        .forEach(shard ->
+        {
+          this.chatRoomInfo[shard] = new HashMap<>();
+          this.chatRoomData[shard] = new HashMap<>();
+        });
   }
 
 
@@ -280,30 +285,37 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
   private void createChatRoom(
       UUID chatRoomId,
       CommandCreateChatRoomTo createChatRoomRequestTo,
-      int partition)
+      Integer partition)
   {
-    log.info("Loading ChatRoom {} with buffer-size {}", chatRoomId, bufferSize);
-    KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId);
-    ChatRoom chatRoom = new ChatRoom(
+    log.info(
+        "Loading ChatRoom {} for shard {} with buffer-size {}",
         chatRoomId,
-        createChatRoomRequestTo.getName(),
         partition,
+        bufferSize);
+    KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId);
+    ChatRoomData chatRoomData = new ChatRoomData(
         clock,
         service,
         bufferSize);
-    putChatRoom(chatRoom);
+    putChatRoom(
+        chatRoomId,
+        createChatRoomRequestTo.getName(),
+        partition,
+        chatRoomData);
   }
 
 
   private void createChatRoom(ChatRoomInfo chatRoomInfo)
   {
     UUID id = chatRoomInfo.getId();
-    String name = chatRoomInfo.getName();
-    int shard = chatRoomInfo.getShard();
     log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
     KafkaChatRoomService service = new KafkaChatRoomService(this, id);
-    ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
-    putChatRoom(chatRoom);
+    ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
+    putChatRoom(
+        chatRoomInfo.getId(),
+        chatRoomInfo.getName(),
+        chatRoomInfo.getShard(),
+        chatRoomData);
   }
 
   private void loadChatMessage(
@@ -316,9 +328,9 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
     Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
 
-    ChatRoom chatRoom = chatrooms[partition].get(chatRoomId);
+    ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId);
     KafkaChatRoomService kafkaChatRoomService =
-        (KafkaChatRoomService) chatRoom.getChatRoomService();
+        (KafkaChatRoomService) chatRoomData.getChatRoomService();
 
     kafkaChatRoomService.persistMessage(message);
   }
@@ -341,22 +353,30 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
   }
 
 
-  private void putChatRoom(ChatRoom chatRoom)
+  private void putChatRoom(
+      UUID chatRoomId,
+      String name,
+      Integer partition,
+      ChatRoomData chatRoomData)
   {
-    Integer partition = chatRoom.getShard();
-    UUID chatRoomId = chatRoom.getId();
-    if (chatrooms[partition].containsKey(chatRoomId))
+    if (this.chatRoomInfo[partition].containsKey(chatRoomId))
     {
-      log.warn("Ignoring existing chat-room: " + chatRoom);
+      log.warn(
+          "Ignoring existing chat-room for {}: {}",
+          partition,
+          chatRoomId);
     }
     else
     {
       log.info(
           "Adding new chat-room to partition {}: {}",
           partition,
-          chatRoom);
+          chatRoomData);
 
-      chatrooms[partition].put(chatRoomId, chatRoom);
+      this.chatRoomInfo[partition].put(
+          chatRoomId,
+          new ChatRoomInfo(chatRoomId, name, partition));
+      this.chatRoomData[partition].put(chatRoomId, chatRoomData);
     }
   }
 
@@ -368,26 +388,49 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
         .toArray();
   }
 
-  Mono<ChatRoom> getChatRoom(int shard, UUID id)
+  Mono<ChatRoomData> getChatRoomData(int shard, UUID id)
+  {
+    if (loadInProgress)
+    {
+      return Mono.error(new LoadInProgressException());
+    }
+
+    if (!isShardOwned[shard])
+    {
+      return Mono.error(new ShardNotOwnedException(shard));
+    }
+
+    return Mono.justOrEmpty(chatRoomData[shard].get(id));
+  }
+
+  Flux<ChatRoomInfo> getChatRoomInfo()
+  {
+    return Flux
+        .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
+        .filter(shard -> isShardOwned[shard])
+        .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values()));
+  }
+
+  Mono<ChatRoomInfo> getChatRoomInfo(int shard, UUID id)
   {
     if (loadInProgress)
     {
-      throw new LoadInProgressException(shard);
+      return Mono.error(new LoadInProgressException());
     }
 
     if (!isShardOwned[shard])
     {
-      throw new ShardNotOwnedException(shard);
+      return Mono.error(new ShardNotOwnedException(shard));
     }
 
-    return Mono.justOrEmpty(chatrooms[shard].get(id));
+    return Mono.justOrEmpty(chatRoomInfo[shard].get(id));
   }
 
-  Flux<ChatRoom> getChatRooms()
+  Flux<ChatRoomData> getChatRoomData()
   {
     return Flux
         .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
         .filter(shard -> isShardOwned[shard])
-        .flatMap(shard -> Flux.fromIterable(chatrooms[shard].values()));
+        .flatMap(shard -> Flux.fromIterable(chatRoomData[shard].values()));
   }
 }