WIP:test: `*ConfigurationIT` asserts, if restored messages can be seen
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / TestListener.java
1 package de.juplo.kafka.chat.backend;
2
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import com.fasterxml.jackson.databind.SerializationFeature;
5 import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
6 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import lombok.extern.slf4j.Slf4j;
9 import org.springframework.core.ParameterizedTypeReference;
10 import org.springframework.http.MediaType;
11 import org.springframework.http.codec.ServerSentEvent;
12 import org.springframework.web.reactive.function.client.WebClient;
13 import reactor.core.publisher.Flux;
14 import reactor.core.publisher.Mono;
15 import reactor.util.retry.Retry;
16
17 import java.time.Duration;
18 import java.util.*;
19
20
21 @Slf4j
22 public class TestListener
23 {
24   static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
25
26
27   public Flux<MessageTo> run()
28   {
29     return Flux
30         .fromArray(chatRooms)
31         .flatMap(chatRoom ->
32         {
33           List<MessageTo> list = new LinkedList<>();
34           receivedMessages.put(chatRoom.getId(), list);
35           return receiveMessages(chatRoom);
36         });
37   }
38
39   Flux<MessageTo> receiveMessages(ChatRoomInfoTo chatRoom)
40   {
41     log.info("Requesting messages for chat-room {}", chatRoom);
42     List<MessageTo> list = receivedMessages.get(chatRoom.getId());
43     return receiveServerSentEvents(chatRoom)
44         .flatMap(sse ->
45         {
46           try
47           {
48             return Mono.just(objectMapper.readValue(sse.data(), MessageTo.class));
49           }
50           catch (Exception e)
51           {
52             return Mono.error(e);
53           }
54         })
55         .doOnNext(message -> list.add(message))
56         .doOnComplete(() -> log.info("{} was completed!", chatRoom))
57         .doOnError(throwalbe -> log.error("{} failed: {}", chatRoom, throwalbe))
58         .thenMany(Flux.defer(() -> receiveMessages(chatRoom)));
59   }
60
61   Flux<ServerSentEvent<String>> receiveServerSentEvents(ChatRoomInfoTo chatRoom)
62   {
63     return webClient
64         .get()
65         .uri(
66             "/{chatRoomId}/listen",
67             chatRoom.getId())
68         .accept(MediaType.TEXT_EVENT_STREAM)
69         .retrieve()
70         .bodyToFlux(SSE_TYPE)
71         .retryWhen(Retry.fixedDelay(15, Duration.ofSeconds(1)));
72   }
73
74
75   private final WebClient webClient;
76   private final ChatRoomInfoTo[] chatRooms;
77   private final ObjectMapper objectMapper;
78
79   final Map<UUID, List<MessageTo>> receivedMessages = new HashMap<>();
80
81
82   TestListener(Integer port, ChatRoomInfoTo[] chatRooms)
83   {
84     webClient = WebClient.create("http://localhost:" + port);
85     this.chatRooms = chatRooms;
86     objectMapper = new ObjectMapper();
87     objectMapper.registerModule(new JavaTimeModule());
88     objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
89   }
90 }