From: Kai Moritz Date: Thu, 22 Feb 2024 15:03:08 +0000 (+0100) Subject: fix: GREEN - Fixed the restore-mechanism X-Git-Tag: rebase--2024-02-23--16-28~20 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=dc53848961fd5622f777621fd4140cb01c2c8739;p=demos%2Fkafka%2Fchat fix: GREEN - Fixed the restore-mechanism * The code of a reactive flow _must not_ call blocking functions. * In order to solve this, the restore-process is triggered explicitly after the creation of the classes. --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java index 0345c008..7d4b9b62 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java @@ -2,30 +2,40 @@ package de.juplo.kafka.chat.backend.implementation.inmemory; import de.juplo.kafka.chat.backend.domain.ChatMessageService; import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.implementation.StorageStrategy; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.LocalDateTime; import java.util.LinkedHashMap; +import java.util.UUID; @Slf4j public class InMemoryChatMessageService implements ChatMessageService { + private final UUID chatRoomId; private final LinkedHashMap messages; - public InMemoryChatMessageService(Flux messageFlux) + public InMemoryChatMessageService(UUID chatRoomId) { log.debug("Creating InMemoryChatMessageService"); + this.chatRoomId = chatRoomId; messages = new LinkedHashMap<>(); - messageFlux + } + + + Mono restore(StorageStrategy storageStrategy) + { + Flux messageFlux = storageStrategy.readChatRoomData(chatRoomId); + + return messageFlux .doOnNext(message -> messages.put(message.getKey(), message)) .then() .doOnSuccess(empty -> log.info("Restored InMemoryChatMessageService")) - .doOnError(throwable -> log.error("Could not restore InMemoryChatMessageService")) - .block(); + .doOnError(throwable -> log.error("Could not restore InMemoryChatMessageService")); } @Override diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java index 5b5785ea..518cf41b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java @@ -31,10 +31,11 @@ public class InMemoryServicesConfiguration StorageStrategy storageStrategy, Clock clock) { - return new SimpleChatHomeService( - storageStrategy, + SimpleChatHomeService chatHomeService = new SimpleChatHomeService( clock, properties.getChatroomBufferSize()); + chatHomeService.restore(storageStrategy).block(); + return chatHomeService; } @Bean @@ -51,11 +52,14 @@ public class InMemoryServicesConfiguration SimpleChatHomeService[] chatHomes = new SimpleChatHomeService[numShards]; IntStream .of(properties.getInmemory().getOwnedShards()) - .forEach(shard -> chatHomes[shard] = new SimpleChatHomeService( - shard, - storageStrategy, - clock, - properties.getChatroomBufferSize())); + .forEach(shard -> + { + SimpleChatHomeService service = chatHomes[shard] = new SimpleChatHomeService( + shard, + clock, + properties.getChatroomBufferSize()); + service.restore(storageStrategy).block(); + }); ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); return new ShardedChatHomeService( properties.getInstanceId(), diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java index dfe8567a..5c3fe2e5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java @@ -23,20 +23,17 @@ public class SimpleChatHomeService implements ChatHomeService public SimpleChatHomeService( - StorageStrategy storageStrategy, Clock clock, int bufferSize) { this( null, - storageStrategy, clock, bufferSize); } public SimpleChatHomeService( Integer shard, - StorageStrategy storageStrategy, Clock clock, int bufferSize) { @@ -45,8 +42,14 @@ public class SimpleChatHomeService implements ChatHomeService this.shard = shard; this.chatRoomInfo = new HashMap<>(); this.chatRoomData = new HashMap<>(); + this.clock = clock; + this.bufferSize = bufferSize; + } - storageStrategy + + Mono restore(StorageStrategy storageStrategy) + { + return storageStrategy .readChatRoomInfo() .filter(info -> { @@ -63,26 +66,25 @@ public class SimpleChatHomeService implements ChatHomeService return false; } }) - .doOnNext(info -> + .flatMap(info -> { UUID chatRoomId = info.getId(); + InMemoryChatMessageService chatMessageService = + new InMemoryChatMessageService(chatRoomId); + chatRoomInfo.put(chatRoomId, info); - Flux messageFlux = - storageStrategy.readChatRoomData(chatRoomId); chatRoomData.put( info.getId(), new ChatRoomData( clock, - new InMemoryChatMessageService(messageFlux), + chatMessageService, bufferSize)); + + return chatMessageService.restore(storageStrategy); }) .then() .doOnSuccess(empty -> log.info("Restored {}", this)) - .doOnError(throwable -> log.error("Could not restore {}", this)) - .block(); - - this.clock = clock; - this.bufferSize = bufferSize; + .doOnError(throwable -> log.error("Could not restore {}", this)); } @@ -90,7 +92,7 @@ public class SimpleChatHomeService implements ChatHomeService public Mono createChatRoom(UUID id, String name) { log.info("Creating ChatRoom with buffer-size {}", bufferSize); - ChatMessageService service = new InMemoryChatMessageService(Flux.empty()); + ChatMessageService service = new InMemoryChatMessageService(id); ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard); this.chatRoomInfo.put(id, chatRoomInfo); ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);