import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
+import reactor.util.retry.Retry;
+import java.time.Duration;
import java.util.*;
static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
- public Mono<Void> run()
+ public Flux<MessageTo> run()
{
return Flux
.fromArray(chatRooms)
.flatMap(chatRoom ->
{
- log.info("Requesting messages from chat-room {}", chatRoom);
List<MessageTo> list = new LinkedList<>();
receivedMessages.put(chatRoom.getId(), list);
- return receiveMessages(chatRoom)
- .flatMap(sse ->
- {
- try
- {
- return Mono.just(objectMapper.readValue(sse.data(), MessageTo.class));
- }
- catch (Exception e)
- {
- return Mono.error(e);
- }
- })
- .doOnNext(message ->
- {
- list.add(message);
- log.info(
- "Received a message from chat-room {}: {}",
- chatRoom.getName(),
- message);
- });
+ return receiveMessages(chatRoom);
+ });
+ }
+
+ Flux<MessageTo> receiveMessages(ChatRoomInfoTo chatRoom)
+ {
+ log.info("Requesting messages for chat-room {}", chatRoom);
+ List<MessageTo> list = receivedMessages.get(chatRoom.getId());
+ return receiveServerSentEvents(chatRoom)
+ .flatMap(sse ->
+ {
+ try
+ {
+ return Mono.just(objectMapper.readValue(sse.data(), MessageTo.class));
+ }
+ catch (Exception e)
+ {
+ return Mono.error(e);
+ }
})
- .limitRate(10)
- .takeUntil(message -> !running)
- .doOnComplete(() -> log.info("TestListener is done"))
- .parallel(chatRooms.length)
- .runOn(Schedulers.parallel())
- .then();
+ .doOnNext(message -> list.add(message))
+ .doOnComplete(() -> log.info("{} was completed!", chatRoom))
+ .doOnError(throwalbe -> log.error("{} failed: {}", chatRoom, throwalbe))
+ .thenMany(Flux.defer(() -> receiveMessages(chatRoom)));
}
- Flux<ServerSentEvent<String>> receiveMessages(ChatRoomInfoTo chatRoom)
+ Flux<ServerSentEvent<String>> receiveServerSentEvents(ChatRoomInfoTo chatRoom)
{
return webClient
.get()
chatRoom.getId())
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
- .bodyToFlux(SSE_TYPE);
+ .bodyToFlux(SSE_TYPE)
+ .retryWhen(Retry.fixedDelay(15, Duration.ofSeconds(1)));
}
final Map<UUID, List<MessageTo>> receivedMessages = new HashMap<>();
- volatile boolean running = true;
-
TestListener(Integer port, ChatRoomInfoTo[] chatRooms)
{