X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Finmemory%2FSimpleChatHome.java;h=868c01e8697d6b330f47761da3e85fd248f9f028;hb=df207aa9a8cd349fd43785270d250a7f55593801;hp=c2d25b201750af9638c667ab2fb18d0fb72a8ea2;hpb=a39837c0ddf444dd98b371eaf8226ad865543519;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java index c2d25b20..868c01e8 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java @@ -1,6 +1,8 @@ package de.juplo.kafka.chat.backend.persistence.inmemory; import de.juplo.kafka.chat.backend.domain.*; +import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException; +import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -13,34 +15,41 @@ import java.util.*; public class SimpleChatHome implements ChatHome { private final Integer shard; - private final Map chatRooms; + private final Map chatRoomInfo; + private final Map chatRoomData; private final Clock clock; private final int bufferSize; public SimpleChatHome( - Flux chatroomFlux, + StorageStrategy storageStrategy, Clock clock, int bufferSize) { - this(null, chatroomFlux, clock, bufferSize); + this( + null, + storageStrategy, + clock, + bufferSize); } public SimpleChatHome( Integer shard, - Flux chatroomFlux, + StorageStrategy storageStrategy, Clock clock, int bufferSize) { log.info("Created SimpleChatHome for shard {}", shard); ; this.shard = shard; - this.chatRooms = new HashMap<>(); - chatroomFlux - .filter(chatRoom -> + this.chatRoomInfo = new HashMap<>(); + this.chatRoomData = new HashMap<>(); + storageStrategy + .readChatRoomInfo() + .filter(info -> { - if (shard == null || chatRoom.getShard() == shard) + if (shard == null || info.getShard() == shard) { return true; } @@ -49,12 +58,24 @@ public class SimpleChatHome implements ChatHome log.info( "SimpleChatHome for shard {} ignores not owned chat-room {}", shard, - chatRoom); + info); return false; } }) .toStream() - .forEach(chatroom -> chatRooms.put(chatroom.getId(), chatroom)); + .forEach(info -> + { + UUID chatRoomId = info.getId(); + chatRoomInfo.put(chatRoomId, info); + Flux messageFlux = + storageStrategy.readChatRoomData(chatRoomId); + chatRoomData.put( + info.getId(), + new ChatRoomData( + clock, + new InMemoryChatRoomService(messageFlux), + bufferSize)); + }); this.clock = clock; this.bufferSize = bufferSize; } @@ -65,22 +86,37 @@ public class SimpleChatHome implements ChatHome { log.info("Creating ChatRoom with buffer-size {}", bufferSize); ChatRoomService service = new InMemoryChatRoomService(Flux.empty()); - ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize); - chatRooms.put(id, chatRoom); - return Mono.just(chatRoom); + ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard); + this.chatRoomInfo.put(id, chatRoomInfo); + ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize); + this.chatRoomData.put(id, chatRoomData); + return Mono.just(chatRoomInfo); } @Override - public Mono getChatRoom(UUID id) + public Mono getChatRoomInfo(UUID id) { return Mono - .justOrEmpty(chatRooms.get(id)) + .justOrEmpty(chatRoomInfo.get(id)) .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); } @Override - public Flux getChatRooms() + public Flux getChatRoomInfo() + { + return Flux.fromIterable(chatRoomInfo.values()); + } + + @Override + public Mono getChatRoomData(UUID id) + { + return Mono + .justOrEmpty(chatRoomData.get(id)) + .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); + } + + public Flux getChatRoomData() { - return Flux.fromIterable(chatRooms.values()); + return Flux.fromIterable(chatRoomData.values()); } }