X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FChatMessageChannel.java;h=78665dee8adf8358e92049c976ab3fb68f42fae2;hb=cbc0ed5c9bf21fc08ca4312627342738e331c634;hp=7620461785e0e0ed132971bd1cce3b8c6f363c80;hpb=13c51b4630177e7f6649500a3d4b876a12509af6;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java index 76204617..78665dee 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java @@ -249,7 +249,21 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener void putChatRoom(ChatRoom chatRoom) { - chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom); + Integer partition = chatRoom.getShard(); + UUID chatRoomId = chatRoom.getId(); + if (chatrooms[partition].containsKey(chatRoomId)) + { + log.warn("Ignoring existing chat-room: " + chatRoom); + } + else + { + log.info( + "Adding new chat-room to partition {}: {}", + partition, + chatRoom); + + chatrooms[partition].put(chatRoomId, chatRoom); + } } Mono getChatRoom(int shard, UUID id)