From 764392103f804972ca81072bd2a9470a9b575d4f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 1 Mar 2024 15:08:06 +0100 Subject: [PATCH] test: HandoverIT-POC - Refactored listening into class `TestListener` --- .../chat/backend/AbstractHandoverIT.java | 27 ++---- .../kafka/chat/backend/TestListener.java | 85 +++++++++++++++++++ 2 files changed, 91 insertions(+), 21 deletions(-) create mode 100644 src/test/java/de/juplo/kafka/chat/backend/TestListener.java diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java index 41d9e49a..6445755e 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java @@ -22,13 +22,12 @@ import java.util.concurrent.Executors; @Slf4j public abstract class AbstractHandoverIT { - static final ParameterizedTypeReference> SSE_TYPE = new ParameterizedTypeReference<>() {}; static final int NUM_CHATROOMS = 23; static final int NUM_CLIENTS = 17; private final AbstractHandoverITContainers containers; - private final ExecutorService executorService = Executors.newFixedThreadPool(NUM_CLIENTS); + private final ExecutorService executorService = Executors.newFixedThreadPool(NUM_CLIENTS + 1); AbstractHandoverIT(AbstractHandoverITContainers containers) @@ -58,17 +57,15 @@ public abstract class AbstractHandoverIT .toStream() .toArray(size -> new TestWriter[size]); + TestListener testListener = new TestListener(port, chatRooms); + executorService.execute(testListener); + Thread.sleep(10000); + Arrays .stream(testWriters) .forEach(testClient -> testClient.running = false); - - Flux - .fromArray(chatRooms) - .flatMap(chatRoom ->receiveMessages(chatRoom).take(100)) - .doOnNext(message -> log.info("message: {}", message)) - .then() - .block(); + testListener.running = false; } Mono createChatRoom(String name) @@ -92,18 +89,6 @@ public abstract class AbstractHandoverIT }); } - Flux> receiveMessages(ChatRoomInfoTo chatRoom) - { - return webClient - .get() - .uri( - "/{chatRoomId}/listen", - chatRoom.getId()) - .accept(MediaType.TEXT_EVENT_STREAM) - .retrieve() - .bodyToFlux(SSE_TYPE); - } - WebClient webClient; diff --git a/src/test/java/de/juplo/kafka/chat/backend/TestListener.java b/src/test/java/de/juplo/kafka/chat/backend/TestListener.java new file mode 100644 index 00000000..1c993c17 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/TestListener.java @@ -0,0 +1,85 @@ +package de.juplo.kafka.chat.backend; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; +import de.juplo.kafka.chat.backend.api.MessageTo; +import lombok.extern.slf4j.Slf4j; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.codec.ServerSentEvent; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; + +import java.nio.charset.Charset; +import java.time.Duration; +import java.util.concurrent.ThreadLocalRandom; + + +@Slf4j +public class TestListener implements Runnable +{ + static final ParameterizedTypeReference> SSE_TYPE = new ParameterizedTypeReference<>() {}; + + + @Override + public void run() + { + Flux + .fromArray(chatRooms) + .flatMap(chatRoom -> receiveMessages(chatRoom) + .flatMap(sse -> + { + try + { + return Mono.just(objectMapper.readValue(sse.data(), MessageTo.class)); + } + catch (Exception e) + { + return Mono.error(e); + } + }) + .doOnNext(message -> log.info( + "Received a message from chat-room {}: {}", + chatRoom, + message)) + .take(30)) + .takeUntil(message -> !running) + .then() + .block(); + } + + Flux> receiveMessages(ChatRoomInfoTo chatRoom) + { + return webClient + .get() + .uri( + "/{chatRoomId}/listen", + chatRoom.getId()) + .accept(MediaType.TEXT_EVENT_STREAM) + .retrieve() + .bodyToFlux(SSE_TYPE); + } + + + private final WebClient webClient; + private final ChatRoomInfoTo[] chatRooms; + private final ObjectMapper objectMapper; + + volatile boolean running = true; + + + TestListener(Integer port, ChatRoomInfoTo[] chatRooms) + { + webClient = WebClient.create("http://localhost:" + port); + this.chatRooms = chatRooms; + objectMapper = new ObjectMapper(); + objectMapper.registerModule(new JavaTimeModule()); + objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + } +} -- 2.20.1