From: Kai Moritz Date: Mon, 4 Sep 2023 20:14:40 +0000 (+0200) Subject: refactor: Renamed `ChatRoom` into `ChatRoomData` - Aligned Code X-Git-Tag: rebase--2023-09-05--23-53~18 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=5a2330e1f34db44251a90980b19ffaed946039da;p=demos%2Fkafka%2Fchat refactor: Renamed `ChatRoom` into `ChatRoomData` - Aligned Code --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java index cc924d79..9fa675a0 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java @@ -16,7 +16,7 @@ public class ChatBackendApplication implements WebFluxConfigurer @Autowired ChatBackendProperties properties; @Autowired - ChatHome[] chatHomes; + ChatHome chatHome; @Autowired StorageStrategy storageStrategy; @@ -32,8 +32,15 @@ public class ChatBackendApplication implements WebFluxConfigurer @PreDestroy public void onExit() { - for (int shard = 0; shard < chatHomes.length; shard++) - storageStrategy.writeChatRoomData(chatHomes[shard].getChatRoomData()); + storageStrategy.writeChatRoomInfo( + chatHome + .getChatRoomInfo() + .doOnNext(chatRoomInfo -> + storageStrategy.writeChatRoomData( + chatRoomInfo.getId(), + chatHome + .getChatRoomData(chatRoomInfo.getId()) + .flatMapMany(chatRoomData -> chatRoomData.getMessages())))); } public static void main(String[] args) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java index 257c6db5..45e93ccc 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java @@ -35,7 +35,8 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener private final boolean[] isShardOwned; private final long[] currentOffset; private final long[] nextOffset; - private final Map[] chatrooms; + private final Map[] chatRoomInfo; + private final Map[] chatRoomData; private boolean running; @Getter @@ -65,10 +66,15 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener this.isShardOwned = new boolean[numShards]; this.currentOffset = new long[numShards]; this.nextOffset = new long[numShards]; - this.chatrooms = new Map[numShards]; + this.chatRoomInfo = new Map[numShards]; + this.chatRoomData = new Map[numShards]; IntStream .range(0, numShards) - .forEach(shard -> this.chatrooms[shard] = new HashMap<>()); + .forEach(shard -> + { + this.chatRoomInfo[shard] = new HashMap<>(); + this.chatRoomData[shard] = new HashMap<>(); + }); } @@ -277,27 +283,37 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener private void createChatRoom( UUID chatRoomId, CommandCreateChatRoomTo createChatRoomRequestTo, - int partition) + Integer partition) { - log.info("Loading ChatRoom {} with buffer-size {}", chatRoomId, bufferSize); + log.info( + "Loading ChatRoom {} for shard {} with buffer-size {}", + chatRoomId, + partition, + bufferSize); KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId); ChatRoomData chatRoomData = new ChatRoomData( clock, service, bufferSize); - putChatRoom(chatRoomData); + putChatRoom( + chatRoomId, + createChatRoomRequestTo.getName(), + partition, + chatRoomData); } private void createChatRoom(ChatRoomInfo chatRoomInfo) { UUID id = chatRoomInfo.getId(); - String name = chatRoomInfo.getName(); - int shard = chatRoomInfo.getShard(); log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); KafkaChatRoomService service = new KafkaChatRoomService(this, id); ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize); - putChatRoom(chatRoomData); + putChatRoom( + chatRoomInfo.getId(), + chatRoomInfo.getName(), + chatRoomInfo.getShard(), + chatRoomData); } private void loadChatMessage( @@ -310,7 +326,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); - ChatRoomData chatRoomData = chatrooms[partition].get(chatRoomId); + ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId); KafkaChatRoomService kafkaChatRoomService = (KafkaChatRoomService) chatRoomData.getChatRoomService(); @@ -335,13 +351,18 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener } - private void putChatRoom(ChatRoomData chatRoomData) + private void putChatRoom( + UUID chatRoomId, + String name, + Integer partition, + ChatRoomData chatRoomData) { - Integer partition = chatRoomData.getShard(); - UUID chatRoomId = chatRoomData.getId(); - if (chatrooms[partition].containsKey(chatRoomId)) + if (this.chatRoomInfo[partition].containsKey(chatRoomId)) { - log.warn("Ignoring existing chat-room: " + chatRoomData); + log.warn( + "Ignoring existing chat-room for {}: {}", + partition, + chatRoomId); } else { @@ -350,7 +371,10 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener partition, chatRoomData); - chatrooms[partition].put(chatRoomId, chatRoomData); + this.chatRoomInfo[partition].put( + chatRoomId, + new ChatRoomInfo(chatRoomId, name, partition)); + this.chatRoomData[partition].put(chatRoomId, chatRoomData); } } @@ -374,7 +398,30 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener return Mono.error(new ShardNotOwnedException(shard)); } - return Mono.justOrEmpty(chatrooms[shard].get(id)); + return Mono.justOrEmpty(chatRoomData[shard].get(id)); + } + + Flux getChatRoomInfo() + { + return Flux + .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i))) + .filter(shard -> isShardOwned[shard]) + .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values())); + } + + Mono getChatRoomInfo(int shard, UUID id) + { + if (loadInProgress) + { + return Mono.error(new LoadInProgressException()); + } + + if (!isShardOwned[shard]) + { + return Mono.error(new ShardNotOwnedException(shard)); + } + + return Mono.justOrEmpty(chatRoomInfo[shard].get(id)); } Flux getChatRoomData() @@ -382,6 +429,6 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener return Flux .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i))) .filter(shard -> isShardOwned[shard]) - .flatMap(shard -> Flux.fromIterable(chatrooms[shard].values())); + .flatMap(shard -> Flux.fromIterable(chatRoomData[shard].values())); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java index c2e95d4d..5f63bf2b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java @@ -34,7 +34,7 @@ public class KafkaChatHome implements ChatHome { int shard = selectShard(id); return chatRoomChannel - .getChatRoomData(shard, id) + .getChatRoomInfo(shard, id) .switchIfEmpty(Mono.error(() -> new UnknownChatroomException( id, shard, @@ -44,7 +44,7 @@ public class KafkaChatHome implements ChatHome @Override public Flux getChatRoomInfo() { - return chatRoomChannel.getChatRoomData(); + return chatRoomChannel.getChatRoomInfo(); } @Override