1 package de.juplo.kafka.chat.backend;
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import com.fasterxml.jackson.databind.SerializationFeature;
5 import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
6 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import lombok.extern.slf4j.Slf4j;
9 import org.springframework.core.ParameterizedTypeReference;
10 import org.springframework.http.HttpStatus;
11 import org.springframework.http.MediaType;
12 import org.springframework.http.codec.ServerSentEvent;
13 import org.springframework.web.reactive.function.client.WebClient;
14 import org.springframework.web.reactive.function.client.WebClientResponseException;
15 import reactor.core.publisher.Flux;
16 import reactor.core.publisher.Mono;
17 import reactor.util.retry.Retry;
19 import java.nio.charset.Charset;
20 import java.time.Duration;
21 import java.util.concurrent.ThreadLocalRandom;
25 public class TestListener implements Runnable
27 static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
35 .flatMap(chatRoom -> receiveMessages(chatRoom)
40 return Mono.just(objectMapper.readValue(sse.data(), MessageTo.class));
48 .takeUntil(message -> !running)
49 .subscribe(message -> log.info("Received message: {}", message));
52 Flux<ServerSentEvent<String>> receiveMessages(ChatRoomInfoTo chatRoom)
57 "/{chatRoomId}/listen",
59 .accept(MediaType.TEXT_EVENT_STREAM)
61 .bodyToFlux(SSE_TYPE);
65 private final WebClient webClient;
66 private final ChatRoomInfoTo[] chatRooms;
67 private final ObjectMapper objectMapper;
69 volatile boolean running = true;
72 TestListener(Integer port, ChatRoomInfoTo[] chatRooms)
74 webClient = WebClient.create("http://localhost:" + port);
75 this.chatRooms = chatRooms;
76 objectMapper = new ObjectMapper();
77 objectMapper.registerModule(new JavaTimeModule());
78 objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);