WIP rebase--2023-08-20
authorKai Moritz <kai@juplo.de>
Sun, 20 Aug 2023 09:39:41 +0000 (11:39 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 20 Aug 2023 09:39:41 +0000 (11:39 +0200)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java

index 0b35f8c..7f93ad8 100644 (file)
@@ -3,6 +3,7 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
 import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
 import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
 import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
@@ -238,7 +239,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     log.info("Exiting normally");
   }
 
-  void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
+  private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
   {
     for (ConsumerRecord<String, AbstractMessageTo> record : records)
     {
@@ -276,7 +277,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     }
   }
 
-  void createChatRoom(
+  private void createChatRoom(
       UUID chatRoomId,
       CommandCreateChatRoomTo createChatRoomRequestTo,
       int partition)
@@ -294,7 +295,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
   }
 
 
-  void createChatRoom(ChatRoomInfo chatRoomInfo)
+  private void createChatRoom(ChatRoomInfo chatRoomInfo)
   {
     UUID id = chatRoomInfo.getId();
     String name = chatRoomInfo.getName();
@@ -305,7 +306,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     putChatRoom(chatRoom);
   }
 
-  void loadChatMessage(
+  private void loadChatMessage(
       UUID chatRoomId,
       LocalDateTime timestamp,
       long offset,
@@ -322,7 +323,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     kafkaChatRoomService.persistMessage(message);
   }
 
-  boolean isLoadingCompleted()
+  private boolean isLoadingCompleted()
   {
     return IntStream
         .range(0, numShards)
@@ -330,7 +331,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
         .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
   }
 
-  void pauseAllOwnedPartions()
+  private void pauseAllOwnedPartions()
   {
     consumer.pause(IntStream
         .range(0, numShards)
@@ -361,6 +362,16 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
 
   Mono<ChatRoom> getChatRoom(int shard, UUID id)
   {
+    if (loadInProgress)
+    {
+      throw new LoadInProgressException(shard);
+    }
+
+    if (!isShardOwned[shard])
+    {
+      throw new ShardNotOwnedException(shard);
+    }
+
     return Mono.justOrEmpty(chatrooms[shard].get(id));
   }
 
index 324e80b..745abca 100644 (file)
@@ -23,14 +23,7 @@ public class KafkaChatHome implements ChatHome
   public Mono<ChatRoom> getChatRoom(UUID id)
   {
     int shard = shardingStrategy.selectShard(id);
-    if (chatRoomChannel.isLoadInProgress())
-    {
-      throw new LoadInProgressException(shard);
-    }
-    else
-    {
-      return chatRoomChannel.getChatRoom(shard, id);
-    }
+    return chatRoomChannel.getChatRoom(shard, id);
   }
 
   @Override