* The code of a reactive flow _must not_ call blocking functions.
* In order to solve this, the restore-process is triggered explicitly
after the creation of the classes.
import de.juplo.kafka.chat.backend.domain.ChatMessageService;
import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.LocalDateTime;
import java.util.LinkedHashMap;
+import java.util.UUID;
@Slf4j
public class InMemoryChatMessageService implements ChatMessageService
{
+ private final UUID chatRoomId;
private final LinkedHashMap<Message.MessageKey, Message> messages;
- public InMemoryChatMessageService(Flux<Message> messageFlux)
+ public InMemoryChatMessageService(UUID chatRoomId)
{
log.debug("Creating InMemoryChatMessageService");
+ this.chatRoomId = chatRoomId;
messages = new LinkedHashMap<>();
- messageFlux
+ }
+
+
+ Mono<Void> restore(StorageStrategy storageStrategy)
+ {
+ Flux<Message> messageFlux = storageStrategy.readChatRoomData(chatRoomId);
+
+ return messageFlux
.doOnNext(message -> messages.put(message.getKey(), message))
.then()
.doOnSuccess(empty -> log.info("Restored InMemoryChatMessageService"))
- .doOnError(throwable -> log.error("Could not restore InMemoryChatMessageService"))
- .block();
+ .doOnError(throwable -> log.error("Could not restore InMemoryChatMessageService"));
}
@Override
StorageStrategy storageStrategy,
Clock clock)
{
- return new SimpleChatHomeService(
- storageStrategy,
+ SimpleChatHomeService chatHomeService = new SimpleChatHomeService(
clock,
properties.getChatroomBufferSize());
+ chatHomeService.restore(storageStrategy).block();
+ return chatHomeService;
}
@Bean
SimpleChatHomeService[] chatHomes = new SimpleChatHomeService[numShards];
IntStream
.of(properties.getInmemory().getOwnedShards())
- .forEach(shard -> chatHomes[shard] = new SimpleChatHomeService(
- shard,
- storageStrategy,
- clock,
- properties.getChatroomBufferSize()));
+ .forEach(shard ->
+ {
+ SimpleChatHomeService service = chatHomes[shard] = new SimpleChatHomeService(
+ shard,
+ clock,
+ properties.getChatroomBufferSize());
+ service.restore(storageStrategy).block();
+ });
ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
return new ShardedChatHomeService(
properties.getInstanceId(),
public SimpleChatHomeService(
- StorageStrategy storageStrategy,
Clock clock,
int bufferSize)
{
this(
null,
- storageStrategy,
clock,
bufferSize);
}
public SimpleChatHomeService(
Integer shard,
- StorageStrategy storageStrategy,
Clock clock,
int bufferSize)
{
this.shard = shard;
this.chatRoomInfo = new HashMap<>();
this.chatRoomData = new HashMap<>();
+ this.clock = clock;
+ this.bufferSize = bufferSize;
+ }
- storageStrategy
+
+ Mono<Void> restore(StorageStrategy storageStrategy)
+ {
+ return storageStrategy
.readChatRoomInfo()
.filter(info ->
{
return false;
}
})
- .doOnNext(info ->
+ .flatMap(info ->
{
UUID chatRoomId = info.getId();
+ InMemoryChatMessageService chatMessageService =
+ new InMemoryChatMessageService(chatRoomId);
+
chatRoomInfo.put(chatRoomId, info);
- Flux<Message> messageFlux =
- storageStrategy.readChatRoomData(chatRoomId);
chatRoomData.put(
info.getId(),
new ChatRoomData(
clock,
- new InMemoryChatMessageService(messageFlux),
+ chatMessageService,
bufferSize));
+
+ return chatMessageService.restore(storageStrategy);
})
.then()
.doOnSuccess(empty -> log.info("Restored {}", this))
- .doOnError(throwable -> log.error("Could not restore {}", this))
- .block();
-
- this.clock = clock;
- this.bufferSize = bufferSize;
+ .doOnError(throwable -> log.error("Could not restore {}", this));
}
public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
{
log.info("Creating ChatRoom with buffer-size {}", bufferSize);
- ChatMessageService service = new InMemoryChatMessageService(Flux.empty());
+ ChatMessageService service = new InMemoryChatMessageService(id);
ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
this.chatRoomInfo.put(id, chatRoomInfo);
ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);