1 package de.juplo.kafka.chat.backend;
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import com.fasterxml.jackson.databind.SerializationFeature;
5 import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
6 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import lombok.extern.slf4j.Slf4j;
9 import org.springframework.core.ParameterizedTypeReference;
10 import org.springframework.http.MediaType;
11 import org.springframework.http.codec.ServerSentEvent;
12 import org.springframework.web.reactive.function.client.WebClient;
13 import reactor.core.publisher.Flux;
14 import reactor.core.publisher.Mono;
15 import reactor.util.retry.Retry;
17 import java.time.Duration;
22 public class TestListener
24 static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
27 public Flux<MessageTo> run()
33 List<MessageTo> list = new LinkedList<>();
34 receivedMessages.put(chatRoom.getId(), list);
35 return receiveMessages(chatRoom);
39 Flux<MessageTo> receiveMessages(ChatRoomInfoTo chatRoom)
41 log.info("Requesting messages for chat-room {}", chatRoom);
42 List<MessageTo> list = receivedMessages.get(chatRoom.getId());
43 return receiveServerSentEvents(chatRoom)
48 return Mono.just(objectMapper.readValue(sse.data(), MessageTo.class));
55 .doOnNext(message -> list.add(message))
56 .doOnComplete(() -> log.info("{} was completed!", chatRoom))
57 .doOnError(throwalbe -> log.error("{} failed: {}", chatRoom, throwalbe))
58 .thenMany(Flux.defer(() -> receiveMessages(chatRoom)));
61 Flux<ServerSentEvent<String>> receiveServerSentEvents(ChatRoomInfoTo chatRoom)
66 "/{chatRoomId}/listen",
68 .accept(MediaType.TEXT_EVENT_STREAM)
71 .retryWhen(Retry.fixedDelay(15, Duration.ofSeconds(1)));
75 private final WebClient webClient;
76 private final ChatRoomInfoTo[] chatRooms;
77 private final ObjectMapper objectMapper;
79 final Map<UUID, List<MessageTo>> receivedMessages = new HashMap<>();
82 TestListener(Integer port, ChatRoomInfoTo[] chatRooms)
84 webClient = WebClient.create("http://localhost:" + port);
85 this.chatRooms = chatRooms;
86 objectMapper = new ObjectMapper();
87 objectMapper.registerModule(new JavaTimeModule());
88 objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);