test: Fixed `AbstractConfigurationIT#testPutMessageInNewChatRoom()`
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / TestListener.java
1 package de.juplo.kafka.chat.backend;
2
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import com.fasterxml.jackson.databind.SerializationFeature;
5 import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
6 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import lombok.extern.slf4j.Slf4j;
9 import org.springframework.core.ParameterizedTypeReference;
10 import org.springframework.http.MediaType;
11 import org.springframework.http.codec.ServerSentEvent;
12 import org.springframework.web.reactive.function.client.WebClient;
13 import reactor.core.publisher.Flux;
14 import reactor.core.publisher.Mono;
15
16 import java.util.*;
17
18
19 @Slf4j
20 public class TestListener
21 {
22   static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
23
24
25   public Flux<MessageTo> run()
26   {
27     return Flux
28         .fromArray(chatRooms)
29         .flatMap(chatRoom ->
30         {
31           List<MessageTo> list = new LinkedList<>();
32           receivedMessages.put(chatRoom.getId(), list);
33           return receiveMessages(chatRoom);
34         });
35   }
36
37   Flux<MessageTo> receiveMessages(ChatRoomInfoTo chatRoom)
38   {
39     log.info("Requesting messages for chat-room {}", chatRoom);
40     List<MessageTo> list = receivedMessages.get(chatRoom.getId());
41     return receiveServerSentEvents(chatRoom)
42         .flatMap(sse ->
43         {
44           try
45           {
46             return Mono.just(objectMapper.readValue(sse.data(), MessageTo.class));
47           }
48           catch (Exception e)
49           {
50             return Mono.error(e);
51           }
52         })
53         .doOnNext(message -> list.add(message))
54         .doOnComplete(() -> log.info("{} was completed!", chatRoom))
55         .doOnError(throwalbe -> log.error("{} failed: {}", chatRoom, throwalbe));
56   }
57
58   Flux<ServerSentEvent<String>> receiveServerSentEvents(ChatRoomInfoTo chatRoom)
59   {
60     return webClient
61         .get()
62         .uri(
63             "/{chatRoomId}/listen",
64             chatRoom.getId())
65         .accept(MediaType.TEXT_EVENT_STREAM)
66         .retrieve()
67         .bodyToFlux(SSE_TYPE);
68   }
69
70
71   private final WebClient webClient;
72   private final ChatRoomInfoTo[] chatRooms;
73   private final ObjectMapper objectMapper;
74
75   final Map<UUID, List<MessageTo>> receivedMessages = new HashMap<>();
76
77
78   TestListener(Integer port, ChatRoomInfoTo[] chatRooms)
79   {
80     webClient = WebClient.create("http://localhost:" + port);
81     this.chatRooms = chatRooms;
82     objectMapper = new ObjectMapper();
83     objectMapper.registerModule(new JavaTimeModule());
84     objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
85   }
86 }