X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Finmemory%2FSimpleChatHomeService.java;h=d568a9b49bc0a7350c49173e2b1ab39ed1794276;hb=4f2a6dfe5a1c620ad795328853c1be71a2581771;hp=5ed039ea5cf35c807900557757e69b91cc37dfe7;hpb=d958fc6f355071a567cf2b1b048c53e124fb4f00;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java index 5ed039ea..d568a9b4 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java @@ -23,30 +23,36 @@ public class SimpleChatHomeService implements ChatHomeService public SimpleChatHomeService( - StorageStrategy storageStrategy, Clock clock, int bufferSize) { this( null, - storageStrategy, clock, bufferSize); } public SimpleChatHomeService( Integer shard, - StorageStrategy storageStrategy, Clock clock, int bufferSize) { -; - this.shard = shard; - log.info("Created {}", this); + log.debug("Creating SimpleChatHomeService"); + this.shard = shard; this.chatRoomInfo = new HashMap<>(); this.chatRoomData = new HashMap<>(); - storageStrategy + this.clock = clock; + this.bufferSize = bufferSize; + } + + + Mono restore(StorageStrategy storageStrategy) + { + chatRoomInfo.clear(); + chatRoomData.clear(); + + return storageStrategy .readChatRoomInfo() .filter(info -> { @@ -63,22 +69,26 @@ public class SimpleChatHomeService implements ChatHomeService return false; } }) - .toStream() - .forEach(info -> + .flatMap(info -> { UUID chatRoomId = info.getId(); + InMemoryChatMessageService chatMessageService = + new InMemoryChatMessageService(chatRoomId); + chatRoomInfo.put(chatRoomId, info); - Flux messageFlux = - storageStrategy.readChatRoomData(chatRoomId); chatRoomData.put( info.getId(), new ChatRoomData( clock, - new InMemoryChatMessageService(messageFlux), + chatMessageService, bufferSize)); - }); - this.clock = clock; - this.bufferSize = bufferSize; + + return chatMessageService.restore(storageStrategy); + }) + .count() + .doOnSuccess(count -> log.info("Restored {} with {} chat-rooms", this, count)) + .doOnError(throwable -> log.error("Could not restore {}", this)) + .then(); } @@ -86,7 +96,7 @@ public class SimpleChatHomeService implements ChatHomeService public Mono createChatRoom(UUID id, String name) { log.info("Creating ChatRoom with buffer-size {}", bufferSize); - ChatMessageService service = new InMemoryChatMessageService(Flux.empty()); + ChatMessageService service = new InMemoryChatMessageService(id); ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard); this.chatRoomInfo.put(id, chatRoomInfo); ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);