NEU
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatMessageChannel.java
index 230f822..78665de 100644 (file)
@@ -251,35 +251,19 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
   {
     Integer partition = chatRoom.getShard();
     UUID chatRoomId = chatRoom.getId();
-    ChatRoom existingChatRoom = chatrooms[partition].get(chatRoomId);
-    if (existingChatRoom == null)
+    if (chatrooms[partition].containsKey(chatRoomId))
+    {
+      log.warn("Ignoring existing chat-room: " + chatRoom);
+    }
+    else
     {
       log.info(
-          "Creating new chat-room in partition {}: {}",
+          "Adding new chat-room to partition {}: {}",
           partition,
           chatRoom);
+
       chatrooms[partition].put(chatRoomId, chatRoom);
     }
-    else
-    {
-      if (chatRoom.getShard() != existingChatRoom.getShard())
-      {
-        throw new IllegalArgumentException(
-            "Could not change the shard of existing chat-room " +
-            chatRoomId + " from " +
-            existingChatRoom.getShard() + " to " +
-            chatRoom.getShard());
-      }
-      else
-      {
-        log.info(
-            "Updating chat-room in partition {}: {} -> {}",
-            partition,
-            existingChatRoom,
-            chatRoom);
-        existingChatRoom.s
-      }
-    }
   }
 
   Mono<ChatRoom> getChatRoom(int shard, UUID id)