WIP:fix-sse
authorKai Moritz <kai@juplo.de>
Sun, 25 Feb 2024 20:44:37 +0000 (21:44 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 25 Feb 2024 20:44:37 +0000 (21:44 +0100)
src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java

index 7108ffb..35e261c 100644 (file)
@@ -11,8 +11,10 @@ import org.awaitility.Awaitility;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.springframework.core.ParameterizedTypeReference;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
+import org.springframework.http.codec.ServerSentEvent;
 import org.springframework.web.reactive.function.client.WebClient;
 import org.testcontainers.containers.*;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -45,8 +47,6 @@ class KafkaHandoverIT extends AbstractHandoverIT
         .forEach(result -> log.info("{}", result));
 
     receiveMessages(chatRoom).subscribe(message -> log.info("message: {}", message));
-
-    Thread.sleep(1000000);
   }
 
   Mono<ChatRoomInfoTo> createChatRoom(String name)
@@ -98,16 +98,16 @@ class KafkaHandoverIT extends AbstractHandoverIT
         });
   }
 
-  Flux<byte[]> receiveMessages(ChatRoomInfoTo chatRoom)
+  Flux<ServerSentEvent<String>> receiveMessages(ChatRoomInfoTo chatRoom)
   {
     return webClient
         .get()
         .uri(
             "/{chatRoomId}/listen",
             chatRoom.getId())
-        .accept(MediaType.APPLICATION_OCTET_STREAM)
+        .accept(MediaType.TEXT_EVENT_STREAM)
         .retrieve()
-        .bodyToFlux(byte[].class);
+        .bodyToFlux(sseType);
   }
 
   @BeforeEach
@@ -327,4 +327,6 @@ class KafkaHandoverIT extends AbstractHandoverIT
   {
     String status;
   }
+
+  ParameterizedTypeReference<ServerSentEvent<String>> sseType = new ParameterizedTypeReference<>() {};
 }