WIP:test: HandoverIT-POC - Refactored listening into class `TestListener`
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / TestListener.java
1 package de.juplo.kafka.chat.backend;
2
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import com.fasterxml.jackson.databind.SerializationFeature;
5 import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
6 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import lombok.extern.slf4j.Slf4j;
9 import org.springframework.core.ParameterizedTypeReference;
10 import org.springframework.http.HttpStatus;
11 import org.springframework.http.MediaType;
12 import org.springframework.http.codec.ServerSentEvent;
13 import org.springframework.web.reactive.function.client.WebClient;
14 import org.springframework.web.reactive.function.client.WebClientResponseException;
15 import reactor.core.publisher.Flux;
16 import reactor.core.publisher.Mono;
17 import reactor.util.retry.Retry;
18
19 import java.nio.charset.Charset;
20 import java.time.Duration;
21 import java.util.concurrent.ThreadLocalRandom;
22
23
24 @Slf4j
25 public class TestListener implements Runnable
26 {
27   static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
28
29
30   @Override
31   public void run()
32   {
33     Flux
34         .fromArray(chatRooms)
35         .flatMap(chatRoom -> receiveMessages(chatRoom)
36             .flatMap(sse ->
37             {
38               try
39               {
40                 return Mono.just(objectMapper.readValue(sse.data(), MessageTo.class));
41               }
42               catch (Exception e)
43               {
44                 return Mono.error(e);
45               }
46             })
47             .doOnNext(message -> log.info(
48                 "Received a message from chat-room {}: {}",
49                 chatRoom,
50                 message))
51             .take(30))
52         .takeUntil(message -> !running)
53         .then()
54         .block();
55   }
56
57   Flux<ServerSentEvent<String>> receiveMessages(ChatRoomInfoTo chatRoom)
58   {
59     return webClient
60         .get()
61         .uri(
62             "/{chatRoomId}/listen",
63             chatRoom.getId())
64         .accept(MediaType.TEXT_EVENT_STREAM)
65         .retrieve()
66         .bodyToFlux(SSE_TYPE);
67   }
68
69
70   private final WebClient webClient;
71   private final ChatRoomInfoTo[] chatRooms;
72   private final ObjectMapper objectMapper;
73
74   volatile boolean running = true;
75
76
77   TestListener(Integer port, ChatRoomInfoTo[] chatRooms)
78   {
79     webClient = WebClient.create("http://localhost:" + port);
80     this.chatRooms = chatRooms;
81     objectMapper = new ObjectMapper();
82     objectMapper.registerModule(new JavaTimeModule());
83     objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
84   }
85 }