From: Kai Moritz Date: Mon, 11 Mar 2024 17:32:46 +0000 (+0100) Subject: WIP:reconnect X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=2f2a2105344e1a4c88cb0f5b296e17a5b1b099a7;p=demos%2Fkafka%2Fchat WIP:reconnect --- diff --git a/src/test/java/de/juplo/kafka/chat/backend/TestListener.java b/src/test/java/de/juplo/kafka/chat/backend/TestListener.java index e5b25293..3f7fb9d8 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/TestListener.java +++ b/src/test/java/de/juplo/kafka/chat/backend/TestListener.java @@ -12,7 +12,9 @@ import org.springframework.http.codec.ServerSentEvent; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; +import java.time.Duration; import java.util.*; @@ -52,7 +54,8 @@ public class TestListener }) .doOnNext(message -> list.add(message)) .doOnComplete(() -> log.info("{} was completed!", chatRoom)) - .doOnError(throwalbe -> log.error("{} failed: {}", chatRoom, throwalbe)); + .doOnError(throwalbe -> log.error("{} failed: {}", chatRoom, throwalbe)) + .thenMany(Flux.defer(() -> receiveMessages(chatRoom))); } Flux> receiveServerSentEvents(ChatRoomInfoTo chatRoom) @@ -64,7 +67,8 @@ public class TestListener chatRoom.getId()) .accept(MediaType.TEXT_EVENT_STREAM) .retrieve() - .bodyToFlux(SSE_TYPE); + .bodyToFlux(SSE_TYPE) + .retryWhen(Retry.fixedDelay(15, Duration.ofSeconds(1))); }