+ Flux<MessageTo> receiveMessages(ChatRoomInfoTo chatRoom)
+ {
+ log.info("Requesting messages for chat-room {}", chatRoom);
+ List<MessageTo> list = receivedMessages.get(chatRoom.getId());
+ return receiveServerSentEvents(chatRoom)
+ .flatMap(sse ->
+ {
+ try
+ {
+ return Mono.just(objectMapper.readValue(sse.data(), MessageTo.class));
+ }
+ catch (Exception e)
+ {
+ return Mono.error(e);
+ }
+ })
+ .doOnNext(message -> list.add(message))
+ .doOnComplete(() -> log.info("{} was completed!", chatRoom))
+ .doOnError(throwalbe -> log.error("{} failed: {}", chatRoom, throwalbe));
+ }
+
+ Flux<ServerSentEvent<String>> receiveServerSentEvents(ChatRoomInfoTo chatRoom)