1 package de.juplo.kafka.chat.backend.implementation.haproxy;
3 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
5 import lombok.extern.slf4j.Slf4j;
6 import org.springframework.http.HttpStatus;
7 import org.springframework.http.HttpStatusCode;
8 import org.springframework.http.MediaType;
9 import org.springframework.web.reactive.function.client.WebClient;
10 import org.springframework.web.reactive.function.client.WebClientResponseException;
11 import reactor.core.publisher.Mono;
13 import java.time.Duration;
18 public class HaproxyDataPlaneApiShardingPublisherStrategy implements ShardingPublisherStrategy
20 public final static String MAPS_PATH = "/services/haproxy/runtime/maps";
21 public final static String INCLUDE_UNMANAGED_PARAM = "include_unmanaged";
22 public final static String MAP_ENTRY_PATH = "/services/haproxy/runtime/maps_entries/{key}";
23 public final static String MAP_PARAM = "map";
26 private final WebClient webClient;
28 private final int mapId;
29 private final String instanceId;
32 public HaproxyDataPlaneApiShardingPublisherStrategy(
37 this.webClient = webClient;
38 this.mapId = webClient
40 .uri(uriBuilder -> uriBuilder
42 .queryParam(INCLUDE_UNMANAGED_PARAM, Boolean.TRUE)
44 .accept(MediaType.APPLICATION_JSON)
45 .exchangeToFlux(response ->
47 if (response.statusCode().equals(HttpStatus.OK))
49 return response.bodyToFlux(MapInfoTo.class);
53 return response.<MapInfoTo>createError().flux();
56 .filter(map -> map.file().trim().equals(mapPath))
58 .map(id -> Integer.valueOf(id))
59 .blockFirst(Duration.ofSeconds(1));
60 this.instanceId = instanceId;
65 public Mono<String> publishOwnership(int shard)
69 .uri(uriBuilder -> uriBuilder
71 .queryParam(MAP_PARAM, "#" + mapId)
72 .build(Map.of("key", shard)))
73 .contentType(MediaType.APPLICATION_JSON)
74 .bodyValue(new MapEntryTo(Integer.toString(shard), instanceId))
75 .accept(MediaType.APPLICATION_JSON)
76 .exchangeToMono(response ->
78 HttpStatusCode statusCode = response.statusCode();
79 if (statusCode.equals(HttpStatus.OK))
81 return response.bodyToMono(MapEntryTo.class);
86 .<MapEntryTo>createError()
88 WebClientResponseException.class,
89 e -> new DataPlaneApiErrorException(
91 e.getResponseBodyAs(DataPlaneApiErrorTo.class)));
94 .map(entry -> entry.value());