From: Kai Moritz Date: Sun, 25 Feb 2024 20:44:37 +0000 (+0100) Subject: WIP:fix-sse X-Git-Tag: rebase--2024-02-26--19-46~2 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=ca955f1a529d7ab0d320282da9a07658a5988125;p=demos%2Fkafka%2Fchat WIP:fix-sse --- diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java index 7108ffb0..35e261c7 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java @@ -11,8 +11,10 @@ import org.awaitility.Awaitility; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; +import org.springframework.http.codec.ServerSentEvent; import org.springframework.web.reactive.function.client.WebClient; import org.testcontainers.containers.*; import org.testcontainers.containers.output.Slf4jLogConsumer; @@ -45,8 +47,6 @@ class KafkaHandoverIT extends AbstractHandoverIT .forEach(result -> log.info("{}", result)); receiveMessages(chatRoom).subscribe(message -> log.info("message: {}", message)); - - Thread.sleep(1000000); } Mono createChatRoom(String name) @@ -98,16 +98,16 @@ class KafkaHandoverIT extends AbstractHandoverIT }); } - Flux receiveMessages(ChatRoomInfoTo chatRoom) + Flux> receiveMessages(ChatRoomInfoTo chatRoom) { return webClient .get() .uri( "/{chatRoomId}/listen", chatRoom.getId()) - .accept(MediaType.APPLICATION_OCTET_STREAM) + .accept(MediaType.TEXT_EVENT_STREAM) .retrieve() - .bodyToFlux(byte[].class); + .bodyToFlux(sseType); } @BeforeEach @@ -327,4 +327,6 @@ class KafkaHandoverIT extends AbstractHandoverIT { String status; } + + ParameterizedTypeReference> sseType = new ParameterizedTypeReference<>() {}; }