a4270a742dd93d52c6eee2286179625decf853fe
[demos/kafka/chat] /
1 package de.juplo.kafka.chat.backend.implementation.haproxy;
2
3 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
4 import lombok.Getter;
5 import lombok.extern.slf4j.Slf4j;
6 import org.springframework.http.HttpStatus;
7 import org.springframework.http.HttpStatusCode;
8 import org.springframework.http.MediaType;
9 import org.springframework.web.reactive.function.client.WebClient;
10 import org.springframework.web.reactive.function.client.WebClientResponseException;
11 import reactor.core.publisher.Mono;
12
13 import java.time.Duration;
14 import java.util.Map;
15
16
17 @Slf4j
18 public class HaproxyDataPlaneApiShardingPublisherStrategy implements ShardingPublisherStrategy
19 {
20   public final static String MAPS_PATH = "/services/haproxy/runtime/maps";
21   public final static String INCLUDE_UNMANAGED_PARAM = "include_unmanaged";
22   public final static String MAP_ENTRY_PATH = "/services/haproxy/runtime/maps_entries/{key}";
23   public final static String MAP_PARAM = "map";
24
25
26   private final WebClient webClient;
27   @Getter
28   private final int mapId;
29   private final String instanceId;
30
31
32   public HaproxyDataPlaneApiShardingPublisherStrategy(
33       WebClient webClient,
34       String mapPath,
35       String instanceId)
36   {
37     this.webClient = webClient;
38     this.mapId = webClient
39         .get()
40         .uri(uriBuilder -> uriBuilder
41             .path(MAPS_PATH)
42             .queryParam(INCLUDE_UNMANAGED_PARAM, Boolean.TRUE)
43             .build())
44         .accept(MediaType.APPLICATION_JSON)
45         .exchangeToFlux(response ->
46         {
47           if (response.statusCode().equals(HttpStatus.OK))
48           {
49             return response.bodyToFlux(MapInfoTo.class);
50           }
51           else
52           {
53             return response.<MapInfoTo>createError().flux();
54           }
55         })
56         .filter(map -> map.file().trim().equals(mapPath))
57         .map(map -> map.id())
58         .map(id -> Integer.valueOf(id))
59         .blockFirst(Duration.ofSeconds(1));
60     this.instanceId = instanceId;
61   }
62
63
64   @Override
65   public Mono<String> publishOwnership(int shard)
66   {
67     return webClient
68         .put()
69         .uri(uriBuilder -> uriBuilder
70             .path(MAP_ENTRY_PATH)
71             .queryParam(MAP_PARAM, "#" + mapId)
72             .build(Map.of("key", shard)))
73         .contentType(MediaType.APPLICATION_JSON)
74         .bodyValue(new MapEntryTo(Integer.toString(shard), instanceId))
75         .accept(MediaType.APPLICATION_JSON)
76         .exchangeToMono(response ->
77         {
78           HttpStatusCode statusCode = response.statusCode();
79           if (statusCode.equals(HttpStatus.OK))
80           {
81             return response.bodyToMono(MapEntryTo.class);
82           }
83           else
84           {
85             return response
86                 .<MapEntryTo>createError()
87                 .onErrorMap(
88                     WebClientResponseException.class,
89                     e -> new DataPlaneApiErrorException(
90                         e.getStatusCode(),
91                         e.getResponseBodyAs(DataPlaneApiErrorTo.class)));
92           }
93         })
94         .map(entry -> entry.value());
95   }
96 }