NEU
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatMessageChannel.java
index 7620461..78665de 100644 (file)
@@ -249,7 +249,21 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
 
   void putChatRoom(ChatRoom chatRoom)
   {
-    chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
+    Integer partition = chatRoom.getShard();
+    UUID chatRoomId = chatRoom.getId();
+    if (chatrooms[partition].containsKey(chatRoomId))
+    {
+      log.warn("Ignoring existing chat-room: " + chatRoom);
+    }
+    else
+    {
+      log.info(
+          "Adding new chat-room to partition {}: {}",
+          partition,
+          chatRoom);
+
+      chatrooms[partition].put(chatRoomId, chatRoom);
+    }
   }
 
   Mono<ChatRoom> getChatRoom(int shard, UUID id)