X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2FTestListener.java;fp=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2FTestListener.java;h=3f7fb9d8b9c9309e7b35cf41b1f8d0bfe11f3e2a;hb=2f2a2105344e1a4c88cb0f5b296e17a5b1b099a7;hp=e5b25293dc11238da625424459cd8fac9e89b604;hpb=7c8ce6a41b6bd2ae7942e08dded8f1c93b7ffd2b;p=demos%2Fkafka%2Fchat diff --git a/src/test/java/de/juplo/kafka/chat/backend/TestListener.java b/src/test/java/de/juplo/kafka/chat/backend/TestListener.java index e5b25293..3f7fb9d8 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/TestListener.java +++ b/src/test/java/de/juplo/kafka/chat/backend/TestListener.java @@ -12,7 +12,9 @@ import org.springframework.http.codec.ServerSentEvent; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; +import java.time.Duration; import java.util.*; @@ -52,7 +54,8 @@ public class TestListener }) .doOnNext(message -> list.add(message)) .doOnComplete(() -> log.info("{} was completed!", chatRoom)) - .doOnError(throwalbe -> log.error("{} failed: {}", chatRoom, throwalbe)); + .doOnError(throwalbe -> log.error("{} failed: {}", chatRoom, throwalbe)) + .thenMany(Flux.defer(() -> receiveMessages(chatRoom))); } Flux> receiveServerSentEvents(ChatRoomInfoTo chatRoom) @@ -64,7 +67,8 @@ public class TestListener chatRoom.getId()) .accept(MediaType.TEXT_EVENT_STREAM) .retrieve() - .bodyToFlux(SSE_TYPE); + .bodyToFlux(SSE_TYPE) + .retryWhen(Retry.fixedDelay(15, Duration.ofSeconds(1))); }