- return receiveMessages(chatRoom)
- .flatMap(sse ->
- {
- try
- {
- return Mono.just(objectMapper.readValue(sse.data(), MessageTo.class));
- }
- catch (Exception e)
- {
- return Mono.error(e);
- }
- })
- .doOnNext(message ->
- {
- list.add(message);
- log.info(
- "Received a message from chat-room {}: {}",
- chatRoom,
- message);
- })
- .take(30);
+ return receiveMessages(chatRoom);
+ });
+ }
+
+ Flux<MessageTo> receiveMessages(ChatRoomInfoTo chatRoom)
+ {
+ log.info("Requesting messages for chat-room {}", chatRoom);
+ List<MessageTo> list = receivedMessages.get(chatRoom.getId());
+ return receiveServerSentEvents(chatRoom)
+ .flatMap(sse ->
+ {
+ try
+ {
+ return Mono.just(objectMapper.readValue(sse.data(), MessageTo.class));
+ }
+ catch (Exception e)
+ {
+ return Mono.error(e);
+ }