feat: GREEN - Implemented `HaproxyDataPlaneApiShardingPublisherStrategy`
authorKai Moritz <kai@juplo.de>
Mon, 18 Mar 2024 14:00:21 +0000 (15:00 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 22 Mar 2024 16:39:20 +0000 (17:39 +0100)
src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/DataPlaneApiErrorException.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/DataPlaneApiErrorTo.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyDataPlaneApiShardingPublisherStrategy.java
src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/MapEntryTo.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/MapInfoTo.java [new file with mode: 0644]

diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/DataPlaneApiErrorException.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/DataPlaneApiErrorException.java
new file mode 100644 (file)
index 0000000..e6febe7
--- /dev/null
@@ -0,0 +1,28 @@
+package de.juplo.kafka.chat.backend.implementation.haproxy;
+
+import org.springframework.http.HttpStatusCode;
+
+
+public class DataPlaneApiErrorException extends RuntimeException
+{
+  private final HttpStatusCode statusCode;
+  private final int errorCode;
+  private final String message;
+
+
+  public DataPlaneApiErrorException(
+      HttpStatusCode statusCode,
+      DataPlaneApiErrorTo error)
+  {
+    super(error.message());
+    this.statusCode = statusCode;
+    this.errorCode = error.code();
+    this.message = error.message();
+  }
+
+  @Override
+  public String toString()
+  {
+    return statusCode + " - " + errorCode + ": " + message;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/DataPlaneApiErrorTo.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/DataPlaneApiErrorTo.java
new file mode 100644 (file)
index 0000000..2f5483b
--- /dev/null
@@ -0,0 +1,9 @@
+package de.juplo.kafka.chat.backend.implementation.haproxy;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public record DataPlaneApiErrorTo(int code, String message)
+{
+}
index 949104a..a4270a7 100644 (file)
@@ -3,15 +3,30 @@ package de.juplo.kafka.chat.backend.implementation.haproxy;
 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.HttpStatusCode;
+import org.springframework.http.MediaType;
 import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
 import reactor.core.publisher.Mono;
 
+import java.time.Duration;
+import java.util.Map;
+
 
 @Slf4j
 public class HaproxyDataPlaneApiShardingPublisherStrategy implements ShardingPublisherStrategy
 {
+  public final static String MAPS_PATH = "/services/haproxy/runtime/maps";
+  public final static String INCLUDE_UNMANAGED_PARAM = "include_unmanaged";
+  public final static String MAP_ENTRY_PATH = "/services/haproxy/runtime/maps_entries/{key}";
+  public final static String MAP_PARAM = "map";
+
+
+  private final WebClient webClient;
   @Getter
   private final int mapId;
+  private final String instanceId;
 
 
   public HaproxyDataPlaneApiShardingPublisherStrategy(
@@ -19,13 +34,63 @@ public class HaproxyDataPlaneApiShardingPublisherStrategy implements ShardingPub
       String mapPath,
       String instanceId)
   {
-    this.mapId = 0;
+    this.webClient = webClient;
+    this.mapId = webClient
+        .get()
+        .uri(uriBuilder -> uriBuilder
+            .path(MAPS_PATH)
+            .queryParam(INCLUDE_UNMANAGED_PARAM, Boolean.TRUE)
+            .build())
+        .accept(MediaType.APPLICATION_JSON)
+        .exchangeToFlux(response ->
+        {
+          if (response.statusCode().equals(HttpStatus.OK))
+          {
+            return response.bodyToFlux(MapInfoTo.class);
+          }
+          else
+          {
+            return response.<MapInfoTo>createError().flux();
+          }
+        })
+        .filter(map -> map.file().trim().equals(mapPath))
+        .map(map -> map.id())
+        .map(id -> Integer.valueOf(id))
+        .blockFirst(Duration.ofSeconds(1));
+    this.instanceId = instanceId;
   }
 
 
   @Override
   public Mono<String> publishOwnership(int shard)
   {
-    return Mono.empty();
+    return webClient
+        .put()
+        .uri(uriBuilder -> uriBuilder
+            .path(MAP_ENTRY_PATH)
+            .queryParam(MAP_PARAM, "#" + mapId)
+            .build(Map.of("key", shard)))
+        .contentType(MediaType.APPLICATION_JSON)
+        .bodyValue(new MapEntryTo(Integer.toString(shard), instanceId))
+        .accept(MediaType.APPLICATION_JSON)
+        .exchangeToMono(response ->
+        {
+          HttpStatusCode statusCode = response.statusCode();
+          if (statusCode.equals(HttpStatus.OK))
+          {
+            return response.bodyToMono(MapEntryTo.class);
+          }
+          else
+          {
+            return response
+                .<MapEntryTo>createError()
+                .onErrorMap(
+                    WebClientResponseException.class,
+                    e -> new DataPlaneApiErrorException(
+                        e.getStatusCode(),
+                        e.getResponseBodyAs(DataPlaneApiErrorTo.class)));
+          }
+        })
+        .map(entry -> entry.value());
   }
 }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/MapEntryTo.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/MapEntryTo.java
new file mode 100644 (file)
index 0000000..5a98f27
--- /dev/null
@@ -0,0 +1,5 @@
+package de.juplo.kafka.chat.backend.implementation.haproxy;
+
+public record MapEntryTo(String key, String value)
+{
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/MapInfoTo.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/MapInfoTo.java
new file mode 100644 (file)
index 0000000..dce11cc
--- /dev/null
@@ -0,0 +1,5 @@
+package de.juplo.kafka.chat.backend.implementation.haproxy;
+
+public record MapInfoTo(String id, String file, String description)
+{
+}