From: Kai Moritz Date: Mon, 18 Mar 2024 14:00:21 +0000 (+0100) Subject: feat: GREEN - Implemented `HaproxyDataPlaneApiShardingPublisherStrategy` X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=7b06f4c85657100050a8eaf18ff6330c6b8d2dad;p=demos%2Fkafka%2Fchat feat: GREEN - Implemented `HaproxyDataPlaneApiShardingPublisherStrategy` --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/DataPlaneApiErrorException.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/DataPlaneApiErrorException.java new file mode 100644 index 00000000..e6febe7f --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/DataPlaneApiErrorException.java @@ -0,0 +1,28 @@ +package de.juplo.kafka.chat.backend.implementation.haproxy; + +import org.springframework.http.HttpStatusCode; + + +public class DataPlaneApiErrorException extends RuntimeException +{ + private final HttpStatusCode statusCode; + private final int errorCode; + private final String message; + + + public DataPlaneApiErrorException( + HttpStatusCode statusCode, + DataPlaneApiErrorTo error) + { + super(error.message()); + this.statusCode = statusCode; + this.errorCode = error.code(); + this.message = error.message(); + } + + @Override + public String toString() + { + return statusCode + " - " + errorCode + ": " + message; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/DataPlaneApiErrorTo.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/DataPlaneApiErrorTo.java new file mode 100644 index 00000000..2f5483b0 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/DataPlaneApiErrorTo.java @@ -0,0 +1,9 @@ +package de.juplo.kafka.chat.backend.implementation.haproxy; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + + +@JsonIgnoreProperties(ignoreUnknown = true) +public record DataPlaneApiErrorTo(int code, String message) +{ +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyDataPlaneApiShardingPublisherStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyDataPlaneApiShardingPublisherStrategy.java index 949104a6..a4270a74 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyDataPlaneApiShardingPublisherStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyDataPlaneApiShardingPublisherStrategy.java @@ -3,15 +3,30 @@ package de.juplo.kafka.chat.backend.implementation.haproxy; import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.springframework.http.HttpStatus; +import org.springframework.http.HttpStatusCode; +import org.springframework.http.MediaType; import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Mono; +import java.time.Duration; +import java.util.Map; + @Slf4j public class HaproxyDataPlaneApiShardingPublisherStrategy implements ShardingPublisherStrategy { + public final static String MAPS_PATH = "/services/haproxy/runtime/maps"; + public final static String INCLUDE_UNMANAGED_PARAM = "include_unmanaged"; + public final static String MAP_ENTRY_PATH = "/services/haproxy/runtime/maps_entries/{key}"; + public final static String MAP_PARAM = "map"; + + + private final WebClient webClient; @Getter private final int mapId; + private final String instanceId; public HaproxyDataPlaneApiShardingPublisherStrategy( @@ -19,13 +34,63 @@ public class HaproxyDataPlaneApiShardingPublisherStrategy implements ShardingPub String mapPath, String instanceId) { - this.mapId = 0; + this.webClient = webClient; + this.mapId = webClient + .get() + .uri(uriBuilder -> uriBuilder + .path(MAPS_PATH) + .queryParam(INCLUDE_UNMANAGED_PARAM, Boolean.TRUE) + .build()) + .accept(MediaType.APPLICATION_JSON) + .exchangeToFlux(response -> + { + if (response.statusCode().equals(HttpStatus.OK)) + { + return response.bodyToFlux(MapInfoTo.class); + } + else + { + return response.createError().flux(); + } + }) + .filter(map -> map.file().trim().equals(mapPath)) + .map(map -> map.id()) + .map(id -> Integer.valueOf(id)) + .blockFirst(Duration.ofSeconds(1)); + this.instanceId = instanceId; } @Override public Mono publishOwnership(int shard) { - return Mono.empty(); + return webClient + .put() + .uri(uriBuilder -> uriBuilder + .path(MAP_ENTRY_PATH) + .queryParam(MAP_PARAM, "#" + mapId) + .build(Map.of("key", shard))) + .contentType(MediaType.APPLICATION_JSON) + .bodyValue(new MapEntryTo(Integer.toString(shard), instanceId)) + .accept(MediaType.APPLICATION_JSON) + .exchangeToMono(response -> + { + HttpStatusCode statusCode = response.statusCode(); + if (statusCode.equals(HttpStatus.OK)) + { + return response.bodyToMono(MapEntryTo.class); + } + else + { + return response + .createError() + .onErrorMap( + WebClientResponseException.class, + e -> new DataPlaneApiErrorException( + e.getStatusCode(), + e.getResponseBodyAs(DataPlaneApiErrorTo.class))); + } + }) + .map(entry -> entry.value()); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/MapEntryTo.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/MapEntryTo.java new file mode 100644 index 00000000..5a98f27e --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/MapEntryTo.java @@ -0,0 +1,5 @@ +package de.juplo.kafka.chat.backend.implementation.haproxy; + +public record MapEntryTo(String key, String value) +{ +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/MapInfoTo.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/MapInfoTo.java new file mode 100644 index 00000000..dce11cc2 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/MapInfoTo.java @@ -0,0 +1,5 @@ +package de.juplo.kafka.chat.backend.implementation.haproxy; + +public record MapInfoTo(String id, String file, String description) +{ +}