X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Finmemory%2FSimpleChatHomeService.java;h=8e3cc4305b3df480059eb4d934996f8c36f4bd7e;hb=900422dccb5a92fbceac34caa5e614b0d7f05ad7;hp=30a181eecf2d31f704008124710887934b845376;hpb=33143e121d6bc74b1a1e5c19b443049bf027ade4;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java index 30a181ee..8e3cc430 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java @@ -18,34 +18,41 @@ public class SimpleChatHomeService implements ChatHomeService private final Map chatRoomInfo; private final Map chatRoomData; private final Clock clock; - private final int bufferSize; + private final int historyLimit; public SimpleChatHomeService( - StorageStrategy storageStrategy, Clock clock, - int bufferSize) + int historyLimit) { this( null, - storageStrategy, clock, - bufferSize); + historyLimit); } public SimpleChatHomeService( Integer shard, - StorageStrategy storageStrategy, Clock clock, - int bufferSize) + int historyLimit) { - log.info("Created SimpleChatHome for shard {}", shard); -; + log.debug("Creating SimpleChatHomeService"); + this.shard = shard; this.chatRoomInfo = new HashMap<>(); this.chatRoomData = new HashMap<>(); - storageStrategy + this.clock = clock; + this.historyLimit = historyLimit; + } + + + Mono restore(StorageStrategy storageStrategy) + { + chatRoomInfo.clear(); + chatRoomData.clear(); + + return storageStrategy .readChatRoomInfo() .filter(info -> { @@ -62,33 +69,39 @@ public class SimpleChatHomeService implements ChatHomeService return false; } }) - .toStream() - .forEach(info -> + .flatMap(info -> { UUID chatRoomId = info.getId(); + InMemoryChatMessageService chatMessageService = + new InMemoryChatMessageService(chatRoomId); + chatRoomInfo.put(chatRoomId, info); - Flux messageFlux = - storageStrategy.readChatRoomData(chatRoomId); - chatRoomData.put( - info.getId(), + ChatRoomData chatRoomData = new ChatRoomData( clock, - new InMemoryChatMessageService(messageFlux), - bufferSize)); - }); - this.clock = clock; - this.bufferSize = bufferSize; + chatMessageService, + historyLimit); + chatRoomData.activate(); + this.chatRoomData.put(info.getId(), chatRoomData); + + return chatMessageService.restore(storageStrategy); + }) + .count() + .doOnSuccess(count -> log.info("Restored {} with {} chat-rooms", this, count)) + .doOnError(throwable -> log.error("Could not restore {}", this)) + .then(); } @Override public Mono createChatRoom(UUID id, String name) { - log.info("Creating ChatRoom with buffer-size {}", bufferSize); - ChatMessageService service = new InMemoryChatMessageService(Flux.empty()); + log.info("Creating ChatRoom with history-limit {}", historyLimit); + ChatMessageService service = new InMemoryChatMessageService(id); ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard); this.chatRoomInfo.put(id, chatRoomInfo); - ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize); + ChatRoomData chatRoomData = new ChatRoomData(clock, service, historyLimit); + chatRoomData.activate(); this.chatRoomData.put(id, chatRoomData); return Mono.just(chatRoomInfo); } @@ -114,4 +127,16 @@ public class SimpleChatHomeService implements ChatHomeService .justOrEmpty(chatRoomData.get(id)) .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); } + + @Override + public Mono getShardOwners() + { + return Mono.empty(); + } + + @Override + public String toString() + { + return SimpleChatHomeService.class.getSimpleName() + ", shard=" + shard; + } }