.fromArray(chatRooms)
.flatMap(chatRoom ->
{
- log.info("Requesting messages for chat-room {}", chatRoom);
List<MessageTo> list = new LinkedList<>();
receivedMessages.put(chatRoom.getId(), list);
- return receiveMessages(chatRoom)
- .flatMap(sse ->
- {
- try
- {
- return Mono.just(objectMapper.readValue(sse.data(), MessageTo.class));
- }
- catch (Exception e)
- {
- return Mono.error(e);
- }
- })
- .doOnNext(message -> list.add(message))
- .doOnComplete(() -> log.info("Listening to {} was completed!", chatRoom))
- .doOnError(throwalbe -> log.error("Listening to {} failed!", chatRoom, throwalbe));
+ return receiveMessages(chatRoom);
});
}
- Flux<ServerSentEvent<String>> receiveMessages(ChatRoomInfoTo chatRoom)
+ Flux<MessageTo> receiveMessages(ChatRoomInfoTo chatRoom)
+ {
+ log.info("Requesting messages for chat-room {}", chatRoom);
+ List<MessageTo> list = receivedMessages.get(chatRoom.getId());
+ return receiveServerSentEvents(chatRoom)
+ .flatMap(sse ->
+ {
+ try
+ {
+ return Mono.just(objectMapper.readValue(sse.data(), MessageTo.class));
+ }
+ catch (Exception e)
+ {
+ return Mono.error(e);
+ }
+ })
+ .doOnNext(message -> list.add(message))
+ .doOnComplete(() -> log.info("Listening to {} was completed!", chatRoom))
+ .doOnError(throwalbe -> log.error("Listening to {} failed!", chatRoom, throwalbe));
+ }
+
+ Flux<ServerSentEvent<String>> receiveServerSentEvents(ChatRoomInfoTo chatRoom)
{
return webClient
.get()