X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FChatMessageChannel.java;h=78665dee8adf8358e92049c976ab3fb68f42fae2;hb=cbc0ed5c9bf21fc08ca4312627342738e331c634;hp=230f8226daffd832b81ddd03b013bcff8287aa12;hpb=716ffa6a7665496e614ce6a1671c8e49c562a4c2;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java index 230f8226..78665dee 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java @@ -251,35 +251,19 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener { Integer partition = chatRoom.getShard(); UUID chatRoomId = chatRoom.getId(); - ChatRoom existingChatRoom = chatrooms[partition].get(chatRoomId); - if (existingChatRoom == null) + if (chatrooms[partition].containsKey(chatRoomId)) + { + log.warn("Ignoring existing chat-room: " + chatRoom); + } + else { log.info( - "Creating new chat-room in partition {}: {}", + "Adding new chat-room to partition {}: {}", partition, chatRoom); + chatrooms[partition].put(chatRoomId, chatRoom); } - else - { - if (chatRoom.getShard() != existingChatRoom.getShard()) - { - throw new IllegalArgumentException( - "Could not change the shard of existing chat-room " + - chatRoomId + " from " + - existingChatRoom.getShard() + " to " + - chatRoom.getShard()); - } - else - { - log.info( - "Updating chat-room in partition {}: {} -> {}", - partition, - existingChatRoom, - chatRoom); - existingChatRoom.s - } - } } Mono getChatRoom(int shard, UUID id)