@Slf4j
public abstract class AbstractHandoverIT
{
- static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
static final int NUM_CHATROOMS = 23;
static final int NUM_CLIENTS = 17;
private final AbstractHandoverITContainers containers;
- private final ExecutorService executorService = Executors.newFixedThreadPool(NUM_CLIENTS);
+ private final ExecutorService executorService = Executors.newFixedThreadPool(NUM_CLIENTS + 1);
AbstractHandoverIT(AbstractHandoverITContainers containers)
.toStream()
.toArray(size -> new TestWriter[size]);
+ TestListener testListener = new TestListener(port, chatRooms);
+ executorService.execute(testListener);
+
Thread.sleep(2000);
Arrays
.stream(testWriters)
.forEach(testClient -> testClient.running = false);
-
- Flux
- .fromArray(chatRooms)
- .flatMap(chatRoom ->receiveMessages(chatRoom))
- .doOnNext(message -> log.info("message: {}", message))
- .take(50)
- .then()
- .block();
+ testListener.running = false;
}
Mono<ChatRoomInfoTo> createChatRoom(String name)
});
}
- Flux<ServerSentEvent<String>> receiveMessages(ChatRoomInfoTo chatRoom)
- {
- return webClient
- .get()
- .uri(
- "/{chatRoomId}/listen",
- chatRoom.getId())
- .accept(MediaType.TEXT_EVENT_STREAM)
- .retrieve()
- .bodyToFlux(SSE_TYPE);
- }
-
WebClient webClient;
--- /dev/null
+package de.juplo.kafka.chat.backend;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
+import de.juplo.kafka.chat.backend.api.MessageTo;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.codec.ServerSentEvent;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+
+import java.nio.charset.Charset;
+import java.time.Duration;
+import java.util.concurrent.ThreadLocalRandom;
+
+
+@Slf4j
+public class TestListener implements Runnable
+{
+ static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
+
+
+ @Override
+ public void run()
+ {
+ Flux
+ .fromArray(chatRooms)
+ .flatMap(chatRoom -> receiveMessages(chatRoom)
+ .flatMap(sse ->
+ {
+ try
+ {
+ return Mono.just(objectMapper.readValue(sse.data(), MessageTo.class));
+ }
+ catch (Exception e)
+ {
+ return Mono.error(e);
+ }
+ })
+ .doOnNext(message -> log.info(
+ "Received a message from chat-room {}: {}",
+ chatRoom,
+ message))
+ .limitRate(10))
+ .takeUntil(message -> !running)
+ .then()
+ .block();
+ }
+
+ Flux<ServerSentEvent<String>> receiveMessages(ChatRoomInfoTo chatRoom)
+ {
+ return webClient
+ .get()
+ .uri(
+ "/{chatRoomId}/listen",
+ chatRoom.getId())
+ .accept(MediaType.TEXT_EVENT_STREAM)
+ .retrieve()
+ .bodyToFlux(SSE_TYPE);
+ }
+
+
+ private final WebClient webClient;
+ private final ChatRoomInfoTo[] chatRooms;
+ private final ObjectMapper objectMapper;
+
+ volatile boolean running = true;
+
+
+ TestListener(Integer port, ChatRoomInfoTo[] chatRooms)
+ {
+ webClient = WebClient.create("http://localhost:" + port);
+ this.chatRooms = chatRooms;
+ objectMapper = new ObjectMapper();
+ objectMapper.registerModule(new JavaTimeModule());
+ objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ }
+}