From 31bbbc9858ab6fde12bbe89b28d561e1f4bf3aa4 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 11 Mar 2024 18:32:46 +0100 Subject: [PATCH] test: HandoverIT-POC - GREEN: The `TestListener` automatically reconnects * If a rebalance happens, an instances, that is no longer responsible for a specific chat-room, closes the connections to listeners of these chat-rooms. * Hence, the `TestListener`, like the real frontend, will have to reconnect, if that happens, to be able to see newly send messages. --- .../java/de/juplo/kafka/chat/backend/TestListener.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/test/java/de/juplo/kafka/chat/backend/TestListener.java b/src/test/java/de/juplo/kafka/chat/backend/TestListener.java index 5bd36468..092cd43d 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/TestListener.java +++ b/src/test/java/de/juplo/kafka/chat/backend/TestListener.java @@ -12,7 +12,9 @@ import org.springframework.http.codec.ServerSentEvent; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; +import java.time.Duration; import java.util.*; @@ -52,7 +54,8 @@ public class TestListener }) .doOnNext(message -> list.add(message)) .doOnComplete(() -> log.info("Listening to {} was completed!", chatRoom)) - .doOnError(throwalbe -> log.error("Listening to {} failed!", chatRoom, throwalbe)); + .doOnError(throwalbe -> log.error("Listening to {} failed!", chatRoom, throwalbe)) + .thenMany(Flux.defer(() -> receiveMessages(chatRoom))); } Flux> receiveServerSentEvents(ChatRoomInfoTo chatRoom) @@ -65,7 +68,8 @@ public class TestListener .accept(MediaType.TEXT_EVENT_STREAM) .header("X-Shard", chatRoom.getShard().toString()) .retrieve() - .bodyToFlux(SSE_TYPE); + .bodyToFlux(SSE_TYPE) + .retryWhen(Retry.fixedDelay(15, Duration.ofSeconds(1))); } -- 2.20.1