FIX:take_vs_limitRate
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / TestListener.java
1 package de.juplo.kafka.chat.backend;
2
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import com.fasterxml.jackson.databind.SerializationFeature;
5 import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
6 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import lombok.extern.slf4j.Slf4j;
9 import org.springframework.core.ParameterizedTypeReference;
10 import org.springframework.http.MediaType;
11 import org.springframework.http.codec.ServerSentEvent;
12 import org.springframework.web.reactive.function.client.WebClient;
13 import reactor.core.publisher.Flux;
14 import reactor.core.publisher.Mono;
15 import reactor.core.scheduler.Schedulers;
16
17 import java.util.*;
18
19
20 @Slf4j
21 public class TestListener
22 {
23   static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
24
25
26   public Mono<Void> run()
27   {
28     return Flux
29         .fromArray(chatRooms)
30         .flatMap(chatRoom ->
31         {
32           log.info("Requesting messages from chat-room {}", chatRoom);
33           List<MessageTo> list = new LinkedList<>();
34           receivedMessages.put(chatRoom.getId(), list);
35           return receiveMessages(chatRoom)
36               .flatMap(sse ->
37               {
38                 try
39                 {
40                   return Mono.just(objectMapper.readValue(sse.data(), MessageTo.class));
41                 }
42                 catch (Exception e)
43                 {
44                   return Mono.error(e);
45                 }
46               })
47               .doOnNext(message ->
48               {
49                 list.add(message);
50                 log.info(
51                     "Received a message from chat-room {}: {}",
52                     chatRoom.getName(),
53                     message);
54               });
55         })
56         .limitRate(10)
57         .takeUntil(message -> !running)
58         .doOnComplete(() -> log.info("TestListener is done"))
59         .parallel(chatRooms.length)
60         .runOn(Schedulers.parallel())
61         .then();
62   }
63
64   Flux<ServerSentEvent<String>> receiveMessages(ChatRoomInfoTo chatRoom)
65   {
66     return webClient
67         .get()
68         .uri(
69             "/{chatRoomId}/listen",
70             chatRoom.getId())
71         .accept(MediaType.TEXT_EVENT_STREAM)
72         .retrieve()
73         .bodyToFlux(SSE_TYPE);
74   }
75
76
77   private final WebClient webClient;
78   private final ChatRoomInfoTo[] chatRooms;
79   private final ObjectMapper objectMapper;
80
81   final Map<UUID, List<MessageTo>> receivedMessages = new HashMap<>();
82
83   volatile boolean running = true;
84
85
86   TestListener(Integer port, ChatRoomInfoTo[] chatRooms)
87   {
88     webClient = WebClient.create("http://localhost:" + port);
89     this.chatRooms = chatRooms;
90     objectMapper = new ObjectMapper();
91     objectMapper.registerModule(new JavaTimeModule());
92     objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
93   }
94 }