From: Kai Moritz Date: Wed, 28 Feb 2024 10:50:11 +0000 (+0100) Subject: fix: Fixed `ConcurrentModificationException` when accessing a chat-room X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=8dbdc24311780e8d3829197975b3f20dbfa82616;p=demos%2Fkafka%2Fchat fix: Fixed `ConcurrentModificationException` when accessing a chat-room * If a new chat-room was created, `InfoChannel` only reacted with the creation of the according `ChatRoomInfo`-instance. * The creation of the accompanying `ChatRoomData`-instance through `DataChannel` was posponed until the new chat-room was accessed the first time. * That way, `InfoChannel` did not need to know `DataChannel`, so that a cyclic dependency could be avoided. * As a downside, this approach was open to a race-condition: if several accesses to the newly created chat-room happend in parallel, a `ConcurrentModificationException` was thrown, since the instance of `ChatRoomData` was created multiple times in parallel. * To circumvent the locking, that would be necesarry to evade this race condition, the approach was refactored, so that `InfoChannel` now explicitly triggers the creation of the `ChatRoomData`-instance. * To do so without introducing a cyclic dependency, the class `ChannelMediator` was introduced, so that `InfoChannel` and `DataChannel` need not to know each other. --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelMediator.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelMediator.java index d6e028a3..10176d32 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelMediator.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelMediator.java @@ -13,6 +13,8 @@ public class ChannelMediator { @Setter private InfoChannel infoChannel; + @Setter + private DataChannel dataChannel; void shardAssigned(int shard) @@ -29,4 +31,9 @@ public class ChannelMediator { return infoChannel.getChatRoomInfo(id); } + + void chatRoomCreated(ChatRoomInfo chatRoomInfo) + { + dataChannel.createChatRoomData(chatRoomInfo); + } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 64117f59..e1754a15 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -267,9 +267,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); - ChatRoomData chatRoomData = this - .chatRoomData[partition] - .computeIfAbsent(chatRoomId, this::computeChatRoomData); + ChatRoomData chatRoomData = computeChatRoomData(chatRoomId, partition); KafkaChatMessageService kafkaChatRoomService = (KafkaChatMessageService) chatRoomData.getChatRoomService(); @@ -312,6 +310,11 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener .toArray(); } + void createChatRoomData(ChatRoomInfo chatRoomInfo) + { + computeChatRoomData(chatRoomInfo.getId(), chatRoomInfo.getShard()); + } + Mono getChatRoomData(int shard, UUID id) { if (loadInProgress) @@ -324,17 +327,28 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener return Mono.error(new ShardNotOwnedException(instanceId, shard)); } - return channelMediator - .getChatRoomInfo(id) - .map(chatRoomInfo -> - chatRoomData[shard].computeIfAbsent(id, this::computeChatRoomData)); + return Mono.justOrEmpty(chatRoomData[shard].get(id)); } - private ChatRoomData computeChatRoomData(UUID chatRoomId) + private ChatRoomData computeChatRoomData(UUID chatRoomId, int shard) { - log.info("Creating ChatRoom {} with buffer-size {}", chatRoomId, bufferSize); - KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); - return new ChatRoomData(clock, service, bufferSize); + ChatRoomData chatRoomData = this.chatRoomData[shard].get(chatRoomId); + + if (chatRoomData != null) + { + log.info( + "Ignoring request to create already existing ChatRoomData for {}", + chatRoomId); + } + else + { + log.info("Creating ChatRoomData {} with buffer-size {}", chatRoomId, bufferSize); + KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); + chatRoomData = new ChatRoomData(clock, service, bufferSize); + this.chatRoomData[shard].put(chatRoomId, chatRoomData); + } + + return chatRoomData; } ConsumerGroupMetadata getConsumerGroupMetadata() diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index a6351d0f..f3150ce2 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -39,6 +39,7 @@ public class InfoChannel implements Runnable private final long[] nextOffset; private final Map chatRoomInfo; private final String instanceUri; + private final ChannelMediator channelMediator; private boolean running; @Getter @@ -51,7 +52,8 @@ public class InfoChannel implements Runnable Consumer infoChannelConsumer, Duration pollingInterval, int numShards, - URI instanceUri) + URI instanceUri, + ChannelMediator channelMediator) { log.debug( "Creating InfoChannel for topic {}", @@ -72,6 +74,8 @@ public class InfoChannel implements Runnable .forEach(partition -> this.nextOffset[partition] = -1l); this.instanceUri = instanceUri.toASCIIString(); + + this.channelMediator = channelMediator; } @@ -282,6 +286,7 @@ public class InfoChannel implements Runnable chatRoomId); this.chatRoomInfo.put(chatRoomId, chatRoomInfo); + this.channelMediator.chatRoomCreated(chatRoomInfo); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index f78beb10..33371279 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -133,7 +133,8 @@ public class KafkaServicesConfiguration infoChannelConsumer, properties.getKafka().getPollingInterval(), properties.getKafka().getNumPartitions(), - properties.getKafka().getInstanceUri()); + properties.getKafka().getInstanceUri(), + channelMediator); channelMediator.setInfoChannel(infoChannel); return infoChannel; } @@ -148,7 +149,7 @@ public class KafkaServicesConfiguration ChannelMediator channelMediator, ShardingPublisherStrategy shardingPublisherStrategy) { - return new DataChannel( + DataChannel dataChannel = new DataChannel( properties.getInstanceId(), properties.getKafka().getDataChannelTopic(), producer, @@ -160,6 +161,8 @@ public class KafkaServicesConfiguration clock, channelMediator, shardingPublisherStrategy); + channelMediator.setDataChannel(dataChannel); + return dataChannel; } @Bean