From ca955f1a529d7ab0d320282da9a07658a5988125 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 25 Feb 2024 21:44:37 +0100 Subject: [PATCH] WIP:fix-sse --- .../de/juplo/kafka/chat/backend/KafkaHandoverIT.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java index 7108ffb0..35e261c7 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java @@ -11,8 +11,10 @@ import org.awaitility.Awaitility; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; +import org.springframework.http.codec.ServerSentEvent; import org.springframework.web.reactive.function.client.WebClient; import org.testcontainers.containers.*; import org.testcontainers.containers.output.Slf4jLogConsumer; @@ -45,8 +47,6 @@ class KafkaHandoverIT extends AbstractHandoverIT .forEach(result -> log.info("{}", result)); receiveMessages(chatRoom).subscribe(message -> log.info("message: {}", message)); - - Thread.sleep(1000000); } Mono createChatRoom(String name) @@ -98,16 +98,16 @@ class KafkaHandoverIT extends AbstractHandoverIT }); } - Flux receiveMessages(ChatRoomInfoTo chatRoom) + Flux> receiveMessages(ChatRoomInfoTo chatRoom) { return webClient .get() .uri( "/{chatRoomId}/listen", chatRoom.getId()) - .accept(MediaType.APPLICATION_OCTET_STREAM) + .accept(MediaType.TEXT_EVENT_STREAM) .retrieve() - .bodyToFlux(byte[].class); + .bodyToFlux(sseType); } @BeforeEach @@ -327,4 +327,6 @@ class KafkaHandoverIT extends AbstractHandoverIT { String status; } + + ParameterizedTypeReference> sseType = new ParameterizedTypeReference<>() {}; } -- 2.20.1