X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Finmemory%2FSimpleChatHome.java;h=c8ddbf9d9132a71e3701f62ae17519060d4e981a;hb=f7c3b95145be5e4a0c6d0139ead7ca755b0fbdff;hp=c2d25b201750af9638c667ab2fb18d0fb72a8ea2;hpb=a39837c0ddf444dd98b371eaf8226ad865543519;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java index c2d25b20..c8ddbf9d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java @@ -1,6 +1,7 @@ package de.juplo.kafka.chat.backend.persistence.inmemory; import de.juplo.kafka.chat.backend.domain.*; +import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -13,34 +14,41 @@ import java.util.*; public class SimpleChatHome implements ChatHome { private final Integer shard; - private final Map chatRooms; + private final Map chatRoomInfo; + private final Map chatRoomData; private final Clock clock; private final int bufferSize; public SimpleChatHome( - Flux chatroomFlux, + StorageStrategy storageStrategy, Clock clock, int bufferSize) { - this(null, chatroomFlux, clock, bufferSize); + this( + null, + storageStrategy, + clock, + bufferSize); } public SimpleChatHome( Integer shard, - Flux chatroomFlux, + StorageStrategy storageStrategy, Clock clock, int bufferSize) { log.info("Created SimpleChatHome for shard {}", shard); ; this.shard = shard; - this.chatRooms = new HashMap<>(); - chatroomFlux - .filter(chatRoom -> + this.chatRoomInfo = new HashMap<>(); + this.chatRoomData = new HashMap<>(); + storageStrategy + .readChatRoomInfo() + .filter(info -> { - if (shard == null || chatRoom.getShard() == shard) + if (shard == null || info.getShard() == shard) { return true; } @@ -49,12 +57,24 @@ public class SimpleChatHome implements ChatHome log.info( "SimpleChatHome for shard {} ignores not owned chat-room {}", shard, - chatRoom); + info); return false; } }) .toStream() - .forEach(chatroom -> chatRooms.put(chatroom.getId(), chatroom)); + .forEach(info -> + { + UUID chatRoomId = info.getId(); + chatRoomInfo.put(chatRoomId, info); + Flux messageFlux = + storageStrategy.readChatRoomData(chatRoomId); + chatRoomData.put( + info.getId(), + new ChatRoomData( + clock, + new InMemoryChatRoomService(messageFlux), + bufferSize)); + }); this.clock = clock; this.bufferSize = bufferSize; } @@ -65,22 +85,37 @@ public class SimpleChatHome implements ChatHome { log.info("Creating ChatRoom with buffer-size {}", bufferSize); ChatRoomService service = new InMemoryChatRoomService(Flux.empty()); - ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize); - chatRooms.put(id, chatRoom); - return Mono.just(chatRoom); + ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard); + this.chatRoomInfo.put(id, chatRoomInfo); + ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize); + this.chatRoomData.put(id, chatRoomData); + return Mono.just(chatRoomInfo); } @Override - public Mono getChatRoom(UUID id) + public Mono getChatRoomInfo(UUID id) { return Mono - .justOrEmpty(chatRooms.get(id)) + .justOrEmpty(chatRoomInfo.get(id)) .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); } @Override - public Flux getChatRooms() + public Flux getChatRoomInfo() + { + return Flux.fromIterable(chatRoomInfo.values()); + } + + @Override + public Mono getChatRoomData(UUID id) + { + return Mono + .justOrEmpty(chatRoomData.get(id)) + .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); + } + + public Flux getChatRoomData() { - return Flux.fromIterable(chatRooms.values()); + return Flux.fromIterable(chatRoomData.values()); } }