X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FChatRoomChannel.java;h=7e95c648ac517844df0dd58104089934444b89e5;hb=df207aa9a8cd349fd43785270d250a7f55593801;hp=7659d1e1d5306e37937b2fe9052f6ec84167b27e;hpb=b19933a21840d78c7ebb94defda572ce94ea954c;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java index 7659d1e1..7e95c648 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java @@ -1,9 +1,8 @@ package de.juplo.kafka.chat.backend.persistence.kafka; -import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; -import de.juplo.kafka.chat.backend.domain.Message; -import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException; +import de.juplo.kafka.chat.backend.domain.*; +import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; +import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo; import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo; @@ -38,7 +37,8 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener private final boolean[] isShardOwned; private final long[] currentOffset; private final long[] nextOffset; - private final Map[] chatrooms; + private final Map[] chatRoomInfo; + private final Map[] chatRoomData; private boolean running; @Getter @@ -68,10 +68,15 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener this.isShardOwned = new boolean[numShards]; this.currentOffset = new long[numShards]; this.nextOffset = new long[numShards]; - this.chatrooms = new Map[numShards]; + this.chatRoomInfo = new Map[numShards]; + this.chatRoomData = new Map[numShards]; IntStream .range(0, numShards) - .forEach(shard -> this.chatrooms[shard] = new HashMap<>()); + .forEach(shard -> + { + this.chatRoomInfo[shard] = new HashMap<>(); + this.chatRoomData[shard] = new HashMap<>(); + }); } @@ -280,30 +285,37 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener private void createChatRoom( UUID chatRoomId, CommandCreateChatRoomTo createChatRoomRequestTo, - int partition) + Integer partition) { - log.info("Loading ChatRoom {} with buffer-size {}", chatRoomId, bufferSize); - KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId); - ChatRoom chatRoom = new ChatRoom( + log.info( + "Loading ChatRoom {} for shard {} with buffer-size {}", chatRoomId, - createChatRoomRequestTo.getName(), partition, + bufferSize); + KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId); + ChatRoomData chatRoomData = new ChatRoomData( clock, service, bufferSize); - putChatRoom(chatRoom); + putChatRoom( + chatRoomId, + createChatRoomRequestTo.getName(), + partition, + chatRoomData); } private void createChatRoom(ChatRoomInfo chatRoomInfo) { UUID id = chatRoomInfo.getId(); - String name = chatRoomInfo.getName(); - int shard = chatRoomInfo.getShard(); log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); KafkaChatRoomService service = new KafkaChatRoomService(this, id); - ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize); - putChatRoom(chatRoom); + ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize); + putChatRoom( + chatRoomInfo.getId(), + chatRoomInfo.getName(), + chatRoomInfo.getShard(), + chatRoomData); } private void loadChatMessage( @@ -316,9 +328,9 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); - ChatRoom chatRoom = chatrooms[partition].get(chatRoomId); + ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId); KafkaChatRoomService kafkaChatRoomService = - (KafkaChatRoomService) chatRoom.getChatRoomService(); + (KafkaChatRoomService) chatRoomData.getChatRoomService(); kafkaChatRoomService.persistMessage(message); } @@ -341,22 +353,30 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener } - private void putChatRoom(ChatRoom chatRoom) + private void putChatRoom( + UUID chatRoomId, + String name, + Integer partition, + ChatRoomData chatRoomData) { - Integer partition = chatRoom.getShard(); - UUID chatRoomId = chatRoom.getId(); - if (chatrooms[partition].containsKey(chatRoomId)) + if (this.chatRoomInfo[partition].containsKey(chatRoomId)) { - log.warn("Ignoring existing chat-room: " + chatRoom); + log.warn( + "Ignoring existing chat-room for {}: {}", + partition, + chatRoomId); } else { log.info( "Adding new chat-room to partition {}: {}", partition, - chatRoom); + chatRoomData); - chatrooms[partition].put(chatRoomId, chatRoom); + this.chatRoomInfo[partition].put( + chatRoomId, + new ChatRoomInfo(chatRoomId, name, partition)); + this.chatRoomData[partition].put(chatRoomId, chatRoomData); } } @@ -368,26 +388,49 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener .toArray(); } - Mono getChatRoom(int shard, UUID id) + Mono getChatRoomData(int shard, UUID id) + { + if (loadInProgress) + { + return Mono.error(new LoadInProgressException()); + } + + if (!isShardOwned[shard]) + { + return Mono.error(new ShardNotOwnedException(shard)); + } + + return Mono.justOrEmpty(chatRoomData[shard].get(id)); + } + + Flux getChatRoomInfo() + { + return Flux + .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i))) + .filter(shard -> isShardOwned[shard]) + .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values())); + } + + Mono getChatRoomInfo(int shard, UUID id) { if (loadInProgress) { - throw new LoadInProgressException(shard); + return Mono.error(new LoadInProgressException()); } if (!isShardOwned[shard]) { - throw new ShardNotOwnedException(shard); + return Mono.error(new ShardNotOwnedException(shard)); } - return Mono.justOrEmpty(chatrooms[shard].get(id)); + return Mono.justOrEmpty(chatRoomInfo[shard].get(id)); } - Flux getChatRooms() + Flux getChatRoomData() { return Flux .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i))) .filter(shard -> isShardOwned[shard]) - .flatMap(shard -> Flux.fromIterable(chatrooms[shard].values())); + .flatMap(shard -> Flux.fromIterable(chatRoomData[shard].values())); } }