From 25fcd0e75de4fcadf400e2f1288d7d0c191be4fb Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 1 Mar 2024 15:08:06 +0100 Subject: [PATCH] test: HandoverIT-POC - Refactored listening into class `TestListener` --- .../chat/backend/AbstractHandoverIT.java | 26 ++---- .../kafka/chat/backend/TestListener.java | 80 +++++++++++++++++++ 2 files changed, 86 insertions(+), 20 deletions(-) create mode 100644 src/test/java/de/juplo/kafka/chat/backend/TestListener.java diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java index 41d9e49a..4df4c1e7 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java @@ -28,7 +28,7 @@ public abstract class AbstractHandoverIT private final AbstractHandoverITContainers containers; - private final ExecutorService executorService = Executors.newFixedThreadPool(NUM_CLIENTS); + private final ExecutorService executorService = Executors.newFixedThreadPool(NUM_CLIENTS + 1); AbstractHandoverIT(AbstractHandoverITContainers containers) @@ -58,17 +58,15 @@ public abstract class AbstractHandoverIT .toStream() .toArray(size -> new TestWriter[size]); + TestListener testListener = new TestListener(port, chatRooms); + executorService.execute(testListener); + Thread.sleep(10000); + Arrays .stream(testWriters) .forEach(testClient -> testClient.running = false); - - Flux - .fromArray(chatRooms) - .flatMap(chatRoom ->receiveMessages(chatRoom).take(100)) - .doOnNext(message -> log.info("message: {}", message)) - .then() - .block(); + testListener.running = false; } Mono createChatRoom(String name) @@ -92,18 +90,6 @@ public abstract class AbstractHandoverIT }); } - Flux> receiveMessages(ChatRoomInfoTo chatRoom) - { - return webClient - .get() - .uri( - "/{chatRoomId}/listen", - chatRoom.getId()) - .accept(MediaType.TEXT_EVENT_STREAM) - .retrieve() - .bodyToFlux(SSE_TYPE); - } - WebClient webClient; diff --git a/src/test/java/de/juplo/kafka/chat/backend/TestListener.java b/src/test/java/de/juplo/kafka/chat/backend/TestListener.java new file mode 100644 index 00000000..0ea86287 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/TestListener.java @@ -0,0 +1,80 @@ +package de.juplo.kafka.chat.backend; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; +import de.juplo.kafka.chat.backend.api.MessageTo; +import lombok.extern.slf4j.Slf4j; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.codec.ServerSentEvent; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; + +import java.nio.charset.Charset; +import java.time.Duration; +import java.util.concurrent.ThreadLocalRandom; + + +@Slf4j +public class TestListener implements Runnable +{ + static final ParameterizedTypeReference> SSE_TYPE = new ParameterizedTypeReference<>() {}; + + + @Override + public void run() + { + Flux + .fromArray(chatRooms) + .flatMap(chatRoom -> receiveMessages(chatRoom) + .flatMap(sse -> + { + try + { + return Mono.just(objectMapper.readValue(sse.data(), MessageTo.class)); + } + catch (Exception e) + { + return Mono.error(e); + } + }) + .take(30)) + .takeUntil(message -> !running) + .subscribe(message -> log.info("Received message: {}", message)); + } + + Flux> receiveMessages(ChatRoomInfoTo chatRoom) + { + return webClient + .get() + .uri( + "/{chatRoomId}/listen", + chatRoom.getId()) + .accept(MediaType.TEXT_EVENT_STREAM) + .retrieve() + .bodyToFlux(SSE_TYPE); + } + + + private final WebClient webClient; + private final ChatRoomInfoTo[] chatRooms; + private final ObjectMapper objectMapper; + + volatile boolean running = true; + + + TestListener(Integer port, ChatRoomInfoTo[] chatRooms) + { + webClient = WebClient.create("http://localhost:" + port); + this.chatRooms = chatRooms; + objectMapper = new ObjectMapper(); + objectMapper.registerModule(new JavaTimeModule()); + objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + } +} -- 2.20.1