refactor: Renamed `ChatRoom` into `ChatRoomData` - Aligned Code
authorKai Moritz <kai@juplo.de>
Mon, 4 Sep 2023 20:14:40 +0000 (22:14 +0200)
committerKai Moritz <kai@juplo.de>
Mon, 4 Sep 2023 20:14:40 +0000 (22:14 +0200)
src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java

index cc924d7..9fa675a 100644 (file)
@@ -16,7 +16,7 @@ public class ChatBackendApplication implements WebFluxConfigurer
        @Autowired
        ChatBackendProperties properties;
        @Autowired
-       ChatHome[] chatHomes;
+       ChatHome chatHome;
        @Autowired
        StorageStrategy storageStrategy;
 
@@ -32,8 +32,15 @@ public class ChatBackendApplication implements WebFluxConfigurer
        @PreDestroy
        public void onExit()
        {
-               for (int shard = 0; shard < chatHomes.length; shard++)
-                       storageStrategy.writeChatRoomData(chatHomes[shard].getChatRoomData());
+               storageStrategy.writeChatRoomInfo(
+                               chatHome
+                                               .getChatRoomInfo()
+                                               .doOnNext(chatRoomInfo ->
+                                                               storageStrategy.writeChatRoomData(
+                                                                               chatRoomInfo.getId(),
+                                                                               chatHome
+                                                                                               .getChatRoomData(chatRoomInfo.getId())
+                                                                                               .flatMapMany(chatRoomData -> chatRoomData.getMessages()))));
        }
 
        public static void main(String[] args)
index 257c6db..45e93cc 100644 (file)
@@ -35,7 +35,8 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
   private final boolean[] isShardOwned;
   private final long[] currentOffset;
   private final long[] nextOffset;
-  private final Map<UUID, ChatRoomData>[] chatrooms;
+  private final Map<UUID, ChatRoomInfo>[] chatRoomInfo;
+  private final Map<UUID, ChatRoomData>[] chatRoomData;
 
   private boolean running;
   @Getter
@@ -65,10 +66,15 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     this.isShardOwned = new boolean[numShards];
     this.currentOffset = new long[numShards];
     this.nextOffset = new long[numShards];
-    this.chatrooms = new Map[numShards];
+    this.chatRoomInfo = new Map[numShards];
+    this.chatRoomData = new Map[numShards];
     IntStream
         .range(0, numShards)
-        .forEach(shard -> this.chatrooms[shard] = new HashMap<>());
+        .forEach(shard ->
+        {
+          this.chatRoomInfo[shard] = new HashMap<>();
+          this.chatRoomData[shard] = new HashMap<>();
+        });
   }
 
 
@@ -277,27 +283,37 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
   private void createChatRoom(
       UUID chatRoomId,
       CommandCreateChatRoomTo createChatRoomRequestTo,
-      int partition)
+      Integer partition)
   {
-    log.info("Loading ChatRoom {} with buffer-size {}", chatRoomId, bufferSize);
+    log.info(
+        "Loading ChatRoom {} for shard {} with buffer-size {}",
+        chatRoomId,
+        partition,
+        bufferSize);
     KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId);
     ChatRoomData chatRoomData = new ChatRoomData(
         clock,
         service,
         bufferSize);
-    putChatRoom(chatRoomData);
+    putChatRoom(
+        chatRoomId,
+        createChatRoomRequestTo.getName(),
+        partition,
+        chatRoomData);
   }
 
 
   private void createChatRoom(ChatRoomInfo chatRoomInfo)
   {
     UUID id = chatRoomInfo.getId();
-    String name = chatRoomInfo.getName();
-    int shard = chatRoomInfo.getShard();
     log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
     KafkaChatRoomService service = new KafkaChatRoomService(this, id);
     ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
-    putChatRoom(chatRoomData);
+    putChatRoom(
+        chatRoomInfo.getId(),
+        chatRoomInfo.getName(),
+        chatRoomInfo.getShard(),
+        chatRoomData);
   }
 
   private void loadChatMessage(
@@ -310,7 +326,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
     Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
 
-    ChatRoomData chatRoomData = chatrooms[partition].get(chatRoomId);
+    ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId);
     KafkaChatRoomService kafkaChatRoomService =
         (KafkaChatRoomService) chatRoomData.getChatRoomService();
 
@@ -335,13 +351,18 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
   }
 
 
-  private void putChatRoom(ChatRoomData chatRoomData)
+  private void putChatRoom(
+      UUID chatRoomId,
+      String name,
+      Integer partition,
+      ChatRoomData chatRoomData)
   {
-    Integer partition = chatRoomData.getShard();
-    UUID chatRoomId = chatRoomData.getId();
-    if (chatrooms[partition].containsKey(chatRoomId))
+    if (this.chatRoomInfo[partition].containsKey(chatRoomId))
     {
-      log.warn("Ignoring existing chat-room: " + chatRoomData);
+      log.warn(
+          "Ignoring existing chat-room for {}: {}",
+          partition,
+          chatRoomId);
     }
     else
     {
@@ -350,7 +371,10 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
           partition,
           chatRoomData);
 
-      chatrooms[partition].put(chatRoomId, chatRoomData);
+      this.chatRoomInfo[partition].put(
+          chatRoomId,
+          new ChatRoomInfo(chatRoomId, name, partition));
+      this.chatRoomData[partition].put(chatRoomId, chatRoomData);
     }
   }
 
@@ -374,7 +398,30 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
       return Mono.error(new ShardNotOwnedException(shard));
     }
 
-    return Mono.justOrEmpty(chatrooms[shard].get(id));
+    return Mono.justOrEmpty(chatRoomData[shard].get(id));
+  }
+
+  Flux<ChatRoomInfo> getChatRoomInfo()
+  {
+    return Flux
+        .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
+        .filter(shard -> isShardOwned[shard])
+        .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values()));
+  }
+
+  Mono<ChatRoomInfo> getChatRoomInfo(int shard, UUID id)
+  {
+    if (loadInProgress)
+    {
+      return Mono.error(new LoadInProgressException());
+    }
+
+    if (!isShardOwned[shard])
+    {
+      return Mono.error(new ShardNotOwnedException(shard));
+    }
+
+    return Mono.justOrEmpty(chatRoomInfo[shard].get(id));
   }
 
   Flux<ChatRoomData> getChatRoomData()
@@ -382,6 +429,6 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     return Flux
         .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
         .filter(shard -> isShardOwned[shard])
-        .flatMap(shard -> Flux.fromIterable(chatrooms[shard].values()));
+        .flatMap(shard -> Flux.fromIterable(chatRoomData[shard].values()));
   }
 }
index c2e95d4..5f63bf2 100644 (file)
@@ -34,7 +34,7 @@ public class KafkaChatHome implements ChatHome
   {
     int shard = selectShard(id);
     return chatRoomChannel
-        .getChatRoomData(shard, id)
+        .getChatRoomInfo(shard, id)
         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
             id,
             shard,
@@ -44,7 +44,7 @@ public class KafkaChatHome implements ChatHome
   @Override
   public Flux<ChatRoomInfo> getChatRoomInfo()
   {
-    return chatRoomChannel.getChatRoomData();
+    return chatRoomChannel.getChatRoomInfo();
   }
 
   @Override