NEU
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatMessageChannel.java
index 7620461..230f822 100644 (file)
@@ -249,7 +249,37 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
 
   void putChatRoom(ChatRoom chatRoom)
   {
-    chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
+    Integer partition = chatRoom.getShard();
+    UUID chatRoomId = chatRoom.getId();
+    ChatRoom existingChatRoom = chatrooms[partition].get(chatRoomId);
+    if (existingChatRoom == null)
+    {
+      log.info(
+          "Creating new chat-room in partition {}: {}",
+          partition,
+          chatRoom);
+      chatrooms[partition].put(chatRoomId, chatRoom);
+    }
+    else
+    {
+      if (chatRoom.getShard() != existingChatRoom.getShard())
+      {
+        throw new IllegalArgumentException(
+            "Could not change the shard of existing chat-room " +
+            chatRoomId + " from " +
+            existingChatRoom.getShard() + " to " +
+            chatRoom.getShard());
+      }
+      else
+      {
+        log.info(
+            "Updating chat-room in partition {}: {} -> {}",
+            partition,
+            existingChatRoom,
+            chatRoom);
+        existingChatRoom.s
+      }
+    }
   }
 
   Mono<ChatRoom> getChatRoom(int shard, UUID id)