X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Finmemory%2FSimpleChatHome.java;h=868c01e8697d6b330f47761da3e85fd248f9f028;hb=df207aa9a8cd349fd43785270d250a7f55593801;hp=f99bc9d8c1e49997c6b9635a5569433f6fb45dd7;hpb=069af796af1880f9f02fdecd484d3e4f63675a02;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java index f99bc9d8..868c01e8 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java @@ -1,49 +1,122 @@ package de.juplo.kafka.chat.backend.persistence.inmemory; -import de.juplo.kafka.chat.backend.domain.ChatHome; -import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.UnknownChatroomException; +import de.juplo.kafka.chat.backend.domain.*; +import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException; +import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.time.Clock; import java.util.*; @Slf4j public class SimpleChatHome implements ChatHome { - private final InMemoryChatHomeService service; - private final int shard; + private final Integer shard; + private final Map chatRoomInfo; + private final Map chatRoomData; + private final Clock clock; + private final int bufferSize; - public SimpleChatHome(InMemoryChatHomeService service, int shard) + + public SimpleChatHome( + StorageStrategy storageStrategy, + Clock clock, + int bufferSize) + { + this( + null, + storageStrategy, + clock, + bufferSize); + } + + public SimpleChatHome( + Integer shard, + StorageStrategy storageStrategy, + Clock clock, + int bufferSize) { log.info("Created SimpleChatHome for shard {}", shard); - this.service = service; +; this.shard = shard; + this.chatRoomInfo = new HashMap<>(); + this.chatRoomData = new HashMap<>(); + storageStrategy + .readChatRoomInfo() + .filter(info -> + { + if (shard == null || info.getShard() == shard) + { + return true; + } + else + { + log.info( + "SimpleChatHome for shard {} ignores not owned chat-room {}", + shard, + info); + return false; + } + }) + .toStream() + .forEach(info -> + { + UUID chatRoomId = info.getId(); + chatRoomInfo.put(chatRoomId, info); + Flux messageFlux = + storageStrategy.readChatRoomData(chatRoomId); + chatRoomData.put( + info.getId(), + new ChatRoomData( + clock, + new InMemoryChatRoomService(messageFlux), + bufferSize)); + }); + this.clock = clock; + this.bufferSize = bufferSize; } - public SimpleChatHome(InMemoryChatHomeService service) + + @Override + public Mono createChatRoom(UUID id, String name) { - this(service, 0); + log.info("Creating ChatRoom with buffer-size {}", bufferSize); + ChatRoomService service = new InMemoryChatRoomService(Flux.empty()); + ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard); + this.chatRoomInfo.put(id, chatRoomInfo); + ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize); + this.chatRoomData.put(id, chatRoomData); + return Mono.just(chatRoomInfo); } + @Override + public Mono getChatRoomInfo(UUID id) + { + return Mono + .justOrEmpty(chatRoomInfo.get(id)) + .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); + } @Override - public Mono getChatRoom(UUID id) + public Flux getChatRoomInfo() { - return service - .getChatRoom(shard, id) - .switchIfEmpty(Mono.error(() -> new UnknownChatroomException( - id, - shard, - service.getOwnedShards()))); + return Flux.fromIterable(chatRoomInfo.values()); } @Override - public Flux getChatRooms() + public Mono getChatRoomData(UUID id) + { + return Mono + .justOrEmpty(chatRoomData.get(id)) + .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); + } + + public Flux getChatRoomData() { - return service.getChatRooms(shard); + return Flux.fromIterable(chatRoomData.values()); } }