test: HandoverIT-POC - Refactored listening into class `TestListener`
authorKai Moritz <kai@juplo.de>
Fri, 1 Mar 2024 14:08:06 +0000 (15:08 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 1 Mar 2024 18:53:31 +0000 (19:53 +0100)
src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java
src/test/java/de/juplo/kafka/chat/backend/TestListener.java [new file with mode: 0644]

index 41d9e49..4df4c1e 100644 (file)
@@ -28,7 +28,7 @@ public abstract class AbstractHandoverIT
 
 
   private final AbstractHandoverITContainers containers;
-  private final ExecutorService executorService = Executors.newFixedThreadPool(NUM_CLIENTS);
+  private final ExecutorService executorService = Executors.newFixedThreadPool(NUM_CLIENTS + 1);
 
 
   AbstractHandoverIT(AbstractHandoverITContainers containers)
@@ -58,17 +58,15 @@ public abstract class AbstractHandoverIT
         .toStream()
         .toArray(size -> new TestWriter[size]);
 
+    TestListener testListener = new TestListener(port, chatRooms);
+    executorService.execute(testListener);
+
     Thread.sleep(10000);
+
     Arrays
         .stream(testWriters)
         .forEach(testClient -> testClient.running = false);
-
-    Flux
-        .fromArray(chatRooms)
-        .flatMap(chatRoom ->receiveMessages(chatRoom).take(100))
-        .doOnNext(message -> log.info("message: {}", message))
-        .then()
-        .block();
+    testListener.running = false;
   }
 
   Mono<ChatRoomInfoTo> createChatRoom(String name)
@@ -92,18 +90,6 @@ public abstract class AbstractHandoverIT
         });
   }
 
-  Flux<ServerSentEvent<String>> receiveMessages(ChatRoomInfoTo chatRoom)
-  {
-    return webClient
-        .get()
-        .uri(
-            "/{chatRoomId}/listen",
-            chatRoom.getId())
-        .accept(MediaType.TEXT_EVENT_STREAM)
-        .retrieve()
-        .bodyToFlux(SSE_TYPE);
-  }
-
 
   WebClient webClient;
 
diff --git a/src/test/java/de/juplo/kafka/chat/backend/TestListener.java b/src/test/java/de/juplo/kafka/chat/backend/TestListener.java
new file mode 100644 (file)
index 0000000..0ea8628
--- /dev/null
@@ -0,0 +1,80 @@
+package de.juplo.kafka.chat.backend;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
+import de.juplo.kafka.chat.backend.api.MessageTo;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.codec.ServerSentEvent;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+
+import java.nio.charset.Charset;
+import java.time.Duration;
+import java.util.concurrent.ThreadLocalRandom;
+
+
+@Slf4j
+public class TestListener implements Runnable
+{
+  static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
+
+
+  @Override
+  public void run()
+  {
+    Flux
+        .fromArray(chatRooms)
+        .flatMap(chatRoom -> receiveMessages(chatRoom)
+            .flatMap(sse ->
+            {
+              try
+              {
+                return Mono.just(objectMapper.readValue(sse.data(), MessageTo.class));
+              }
+              catch (Exception e)
+              {
+                return Mono.error(e);
+              }
+            })
+            .take(30))
+        .takeUntil(message -> !running)
+        .subscribe(message -> log.info("Received message: {}", message));
+  }
+
+  Flux<ServerSentEvent<String>> receiveMessages(ChatRoomInfoTo chatRoom)
+  {
+    return webClient
+        .get()
+        .uri(
+            "/{chatRoomId}/listen",
+            chatRoom.getId())
+        .accept(MediaType.TEXT_EVENT_STREAM)
+        .retrieve()
+        .bodyToFlux(SSE_TYPE);
+  }
+
+
+  private final WebClient webClient;
+  private final ChatRoomInfoTo[] chatRooms;
+  private final ObjectMapper objectMapper;
+
+  volatile boolean running = true;
+
+
+  TestListener(Integer port, ChatRoomInfoTo[] chatRooms)
+  {
+    webClient = WebClient.create("http://localhost:" + port);
+    this.chatRooms = chatRooms;
+    objectMapper = new ObjectMapper();
+    objectMapper.registerModule(new JavaTimeModule());
+    objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+  }
+}