X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;ds=sidebyside;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2FInMemoryChatHomeService.java;h=b2f94ec3da7a418d21a19d8089230d222331d6b4;hb=aa0efd1151673c5f0f1576c3026f6fdd0dfad691;hp=41ecd4cffd8581bfc2e23ddcaa667a3bbcc5cf47;hpb=082c283842990180b23f8a35fb3dfa4ebc8a5189;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatHomeService.java index 41ecd4cf..b2f94ec3 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatHomeService.java @@ -1,32 +1,57 @@ package de.juplo.kafka.chat.backend.persistence; -import de.juplo.kafka.chat.backend.domain.Chatroom; -import de.juplo.kafka.chat.backend.domain.ChatroomFactory; -import lombok.RequiredArgsConstructor; +import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatHomeService; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import java.time.Clock; +import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.Map; import java.util.UUID; -@RequiredArgsConstructor -public class InMemoryChatroomFactory implements ChatroomFactory +@Slf4j +public class InMemoryChatHomeService implements ChatHomeService { + private final Map chatrooms; + private final Clock clock; private final int bufferSize; + public InMemoryChatHomeService( + Flux chatroomFlux, + Clock clock, + int bufferSize) + { + log.debug("Creating InMemoryChatHomeService with buffer-size {} (for created ChatRoom's)", bufferSize); + this.chatrooms = new HashMap<>(); + chatroomFlux.subscribe(chatroom -> chatrooms.put(chatroom.getId(), chatroom)); + this.clock = clock; + this.bufferSize = bufferSize; + } + + @Override + public Mono createChatRoom(UUID id, String name) + { + InMemoryChatRoomService service = + new InMemoryChatRoomService(new LinkedHashMap<>()); + ChatRoom chatRoom = new ChatRoom(id, name, clock, service, bufferSize); + chatrooms.put(chatRoom.getId(), chatRoom); + return Mono.just(chatRoom); + } + @Override - public Chatroom createChatroom(UUID id, String name) + public Mono getChatRoom(UUID id) { - InMemoryChatroomService chatroomService = - new InMemoryChatroomService(new LinkedHashMap<>()); - return new Chatroom(id, name, chatroomService, bufferSize); + return Mono.justOrEmpty(chatrooms.get(id)); } - public Chatroom restoreChatroom( - UUID id, - String name, - InMemoryChatroomService chatroomService) + @Override + public Flux getChatRooms() { - return new Chatroom(id, name, chatroomService, bufferSize); + return Flux.fromStream(chatrooms.values().stream()); } }