test: HandoverIT-POC - Listener/Writer remember received/sent messages
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / TestListener.java
1 package de.juplo.kafka.chat.backend;
2
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import com.fasterxml.jackson.databind.SerializationFeature;
5 import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
6 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import lombok.extern.slf4j.Slf4j;
9 import org.springframework.core.ParameterizedTypeReference;
10 import org.springframework.http.MediaType;
11 import org.springframework.http.codec.ServerSentEvent;
12 import org.springframework.web.reactive.function.client.WebClient;
13 import reactor.core.publisher.Flux;
14 import reactor.core.publisher.Mono;
15
16 import java.util.*;
17
18
19 @Slf4j
20 public class TestListener implements Runnable
21 {
22   static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
23
24
25   @Override
26   public void run()
27   {
28     Flux
29         .fromArray(chatRooms)
30         .flatMap(chatRoom ->
31         {
32           List<MessageTo> list = new LinkedList<>();
33           receivedMessages.put(chatRoom.getId(), list);
34           return receiveMessages(chatRoom)
35               .flatMap(sse ->
36               {
37                 try
38                 {
39                   return Mono.just(objectMapper.readValue(sse.data(), MessageTo.class));
40                 }
41                 catch (Exception e)
42                 {
43                   return Mono.error(e);
44                 }
45               })
46               .doOnNext(message ->
47               {
48                 list.add(message);
49                 log.info(
50                     "Received a message from chat-room {}: {}",
51                     chatRoom,
52                     message);
53               })
54               .take(30);
55         })
56         .takeUntil(message -> !running)
57         .then()
58         .block();
59   }
60
61   Flux<ServerSentEvent<String>> receiveMessages(ChatRoomInfoTo chatRoom)
62   {
63     return webClient
64         .get()
65         .uri(
66             "/{chatRoomId}/listen",
67             chatRoom.getId())
68         .accept(MediaType.TEXT_EVENT_STREAM)
69         .retrieve()
70         .bodyToFlux(SSE_TYPE);
71   }
72
73
74   private final WebClient webClient;
75   private final ChatRoomInfoTo[] chatRooms;
76   private final ObjectMapper objectMapper;
77
78   final Map<UUID, List<MessageTo>> receivedMessages = new HashMap<>();
79
80   volatile boolean running = true;
81
82
83   TestListener(Integer port, ChatRoomInfoTo[] chatRooms)
84   {
85     webClient = WebClient.create("http://localhost:" + port);
86     this.chatRooms = chatRooms;
87     objectMapper = new ObjectMapper();
88     objectMapper.registerModule(new JavaTimeModule());
89     objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
90   }
91 }