test: HandoverIT-POC - Refactored listening into class `TestListener`
authorKai Moritz <kai@juplo.de>
Fri, 1 Mar 2024 14:08:06 +0000 (15:08 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 3 Mar 2024 10:40:57 +0000 (11:40 +0100)
src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java
src/test/java/de/juplo/kafka/chat/backend/TestListener.java [new file with mode: 0644]

index 41d9e49..6445755 100644 (file)
@@ -22,13 +22,12 @@ import java.util.concurrent.Executors;
 @Slf4j
 public abstract class AbstractHandoverIT
 {
-  static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
   static final int NUM_CHATROOMS = 23;
   static final int NUM_CLIENTS = 17;
 
 
   private final AbstractHandoverITContainers containers;
-  private final ExecutorService executorService = Executors.newFixedThreadPool(NUM_CLIENTS);
+  private final ExecutorService executorService = Executors.newFixedThreadPool(NUM_CLIENTS + 1);
 
 
   AbstractHandoverIT(AbstractHandoverITContainers containers)
@@ -58,17 +57,15 @@ public abstract class AbstractHandoverIT
         .toStream()
         .toArray(size -> new TestWriter[size]);
 
+    TestListener testListener = new TestListener(port, chatRooms);
+    executorService.execute(testListener);
+
     Thread.sleep(10000);
+
     Arrays
         .stream(testWriters)
         .forEach(testClient -> testClient.running = false);
-
-    Flux
-        .fromArray(chatRooms)
-        .flatMap(chatRoom ->receiveMessages(chatRoom).take(100))
-        .doOnNext(message -> log.info("message: {}", message))
-        .then()
-        .block();
+    testListener.running = false;
   }
 
   Mono<ChatRoomInfoTo> createChatRoom(String name)
@@ -92,18 +89,6 @@ public abstract class AbstractHandoverIT
         });
   }
 
-  Flux<ServerSentEvent<String>> receiveMessages(ChatRoomInfoTo chatRoom)
-  {
-    return webClient
-        .get()
-        .uri(
-            "/{chatRoomId}/listen",
-            chatRoom.getId())
-        .accept(MediaType.TEXT_EVENT_STREAM)
-        .retrieve()
-        .bodyToFlux(SSE_TYPE);
-  }
-
 
   WebClient webClient;
 
diff --git a/src/test/java/de/juplo/kafka/chat/backend/TestListener.java b/src/test/java/de/juplo/kafka/chat/backend/TestListener.java
new file mode 100644 (file)
index 0000000..1c993c1
--- /dev/null
@@ -0,0 +1,85 @@
+package de.juplo.kafka.chat.backend;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
+import de.juplo.kafka.chat.backend.api.MessageTo;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.codec.ServerSentEvent;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+
+import java.nio.charset.Charset;
+import java.time.Duration;
+import java.util.concurrent.ThreadLocalRandom;
+
+
+@Slf4j
+public class TestListener implements Runnable
+{
+  static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
+
+
+  @Override
+  public void run()
+  {
+    Flux
+        .fromArray(chatRooms)
+        .flatMap(chatRoom -> receiveMessages(chatRoom)
+            .flatMap(sse ->
+            {
+              try
+              {
+                return Mono.just(objectMapper.readValue(sse.data(), MessageTo.class));
+              }
+              catch (Exception e)
+              {
+                return Mono.error(e);
+              }
+            })
+            .doOnNext(message -> log.info(
+                "Received a message from chat-room {}: {}",
+                chatRoom,
+                message))
+            .take(30))
+        .takeUntil(message -> !running)
+        .then()
+        .block();
+  }
+
+  Flux<ServerSentEvent<String>> receiveMessages(ChatRoomInfoTo chatRoom)
+  {
+    return webClient
+        .get()
+        .uri(
+            "/{chatRoomId}/listen",
+            chatRoom.getId())
+        .accept(MediaType.TEXT_EVENT_STREAM)
+        .retrieve()
+        .bodyToFlux(SSE_TYPE);
+  }
+
+
+  private final WebClient webClient;
+  private final ChatRoomInfoTo[] chatRooms;
+  private final ObjectMapper objectMapper;
+
+  volatile boolean running = true;
+
+
+  TestListener(Integer port, ChatRoomInfoTo[] chatRooms)
+  {
+    webClient = WebClient.create("http://localhost:" + port);
+    this.chatRooms = chatRooms;
+    objectMapper = new ObjectMapper();
+    objectMapper.registerModule(new JavaTimeModule());
+    objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+  }
+}