fix: GREEN - Dynamic changes to the sharding-map are synced back to disk
authorKai Moritz <kai@juplo.de>
Wed, 20 Mar 2024 21:02:09 +0000 (22:02 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 22 Mar 2024 16:39:20 +0000 (17:39 +0100)
* HAProxy distinguishes managed and unmanaged maps.
** Only Maps, that are loaded from the directory `maps`, that lives in the
   config-directory of HAProxy, are treated as managed.
** Only managed maps are listed by default by the Data Plane API.
** And last but not least, only managed maps _can be synced back to disk_,
   which makes dynamically changes persistent, so that they can survive
   a restart or reload of HAProxy.
* And only managed maps can as well be referenced by their _name_, as by
  their id.
* The name of managed maps is the name of the file, _without_ the path and
  without the suffix `.map`.
* What makes it more difficulte in the case of the HAProxy running inside
  a docker-container is, that this mechanism only seems to work, if the
  config-directory is identical to the default `/etc/haproxy`, which _is
  not_ the case in the default-configuration of the used official image.
* Hence, changing the config-directory to the default (in this case)
  enables managed maps, which is crucial for the setup of the Handover-IT,
  whicht has to reload the HAProxy-process, in order to let it detect newly
  added backend-instances, that are otherwise ignored, because the name-
  resolution inside the docker-network only works _after_ an instance is
  started -- _without this change_, the dynamically made changes to the
  sharding-map are lost during the reload, that is performed shortly
  afterwards.
* The refined configuration for HAProxy enables the following refinements
  of the `HaproxyDataPlaneApiShardingPublisherStrategy`:
** Because the sharding-map is managed now, applied changes can be *synced
   back to disk* forcibly, which is not possible for unmanaged maps.
** That means: *Dynamically made changes persist a reload/restart!*
** Also, because the sharding-map is managed now, it can be simply
   referenced by its name `sharding`.
** Therefore, the `HaproxyDataPlaneApiShardingPublisherStrategyIT` does
   no longer have to detect the ID of the map to be able to reference it.

src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java
src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyDataPlaneApiShardingPublisherStrategy.java
src/test/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyDataPlaneApiShardingPublisherStrategyIT.java
src/test/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyDataPlaneApiShardingPublisherStrategyTest.java
src/test/resources/dataplaneapi.yml
src/test/resources/de/juplo/kafka/chat/backend/implementation/haproxy/maps__get.json [deleted file]
src/test/resources/haproxy.cfg

index ba79452..1d11518 100644 (file)
@@ -48,7 +48,7 @@ public class ChatBackendProperties
     private int numPartitions = 2;
     private Duration pollingInterval = Duration.ofSeconds(1);
     private String haproxyRuntimeApi = "haproxy:8401";
-    private String haproxyMap = "/usr/local/etc/haproxy/sharding.map";
+    private String haproxyMap = "sharding";
   }
 
   @Getter
index a4270a7..5247e57 100644 (file)
@@ -1,7 +1,7 @@
 package de.juplo.kafka.chat.backend.implementation.haproxy;
 
 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
-import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.HttpStatusCode;
@@ -10,57 +10,23 @@ import org.springframework.web.reactive.function.client.WebClient;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 import reactor.core.publisher.Mono;
 
-import java.time.Duration;
 import java.util.Map;
 
 
+@RequiredArgsConstructor
 @Slf4j
 public class HaproxyDataPlaneApiShardingPublisherStrategy implements ShardingPublisherStrategy
 {
-  public final static String MAPS_PATH = "/services/haproxy/runtime/maps";
-  public final static String INCLUDE_UNMANAGED_PARAM = "include_unmanaged";
   public final static String MAP_ENTRY_PATH = "/services/haproxy/runtime/maps_entries/{key}";
   public final static String MAP_PARAM = "map";
+  public final static String FORCE_SYNC_PARAM = "force_sync";
 
 
   private final WebClient webClient;
-  @Getter
-  private final int mapId;
+  private final String mapName;
   private final String instanceId;
 
 
-  public HaproxyDataPlaneApiShardingPublisherStrategy(
-      WebClient webClient,
-      String mapPath,
-      String instanceId)
-  {
-    this.webClient = webClient;
-    this.mapId = webClient
-        .get()
-        .uri(uriBuilder -> uriBuilder
-            .path(MAPS_PATH)
-            .queryParam(INCLUDE_UNMANAGED_PARAM, Boolean.TRUE)
-            .build())
-        .accept(MediaType.APPLICATION_JSON)
-        .exchangeToFlux(response ->
-        {
-          if (response.statusCode().equals(HttpStatus.OK))
-          {
-            return response.bodyToFlux(MapInfoTo.class);
-          }
-          else
-          {
-            return response.<MapInfoTo>createError().flux();
-          }
-        })
-        .filter(map -> map.file().trim().equals(mapPath))
-        .map(map -> map.id())
-        .map(id -> Integer.valueOf(id))
-        .blockFirst(Duration.ofSeconds(1));
-    this.instanceId = instanceId;
-  }
-
-
   @Override
   public Mono<String> publishOwnership(int shard)
   {
@@ -68,7 +34,8 @@ public class HaproxyDataPlaneApiShardingPublisherStrategy implements ShardingPub
         .put()
         .uri(uriBuilder -> uriBuilder
             .path(MAP_ENTRY_PATH)
-            .queryParam(MAP_PARAM, "#" + mapId)
+            .queryParam(MAP_PARAM, mapName)
+            .queryParam(FORCE_SYNC_PARAM, 1)
             .build(Map.of("key", shard)))
         .contentType(MediaType.APPLICATION_JSON)
         .bodyValue(new MapEntryTo(Integer.toString(shard), instanceId))
index 41ed2de..5f49fd5 100644 (file)
@@ -57,8 +57,8 @@ public class HaproxyDataPlaneApiShardingPublisherStrategyIT
         .get()
         .uri(uriBuilder -> uriBuilder
             .path(MAP_ENTRY_PATH)
-            .queryParam(MAP_PARAM, "#" + shardingPublisherStrategy.getMapId())
-            .build(SHARD))
+            .queryParam(MAP_PARAM, MAP_NAME)
+            .build(key))
         .accept(MediaType.APPLICATION_JSON)
         .exchangeToMono(response ->
         {
@@ -91,29 +91,30 @@ public class HaproxyDataPlaneApiShardingPublisherStrategyIT
 
     shardingPublisherStrategy = new HaproxyDataPlaneApiShardingPublisherStrategy(
         webClient,
-        MAP_PATH,
+        MAP_NAME,
         INSTANCE_ID);
   }
 
 
-  static final String MAP_PATH = "/usr/local/etc/haproxy/sharding.map";
+  static final String MAP_NAME = "sharding";
   static final String INSTANCE_ID = "foo";
   static final int SHARD = 6;
 
   @Container
   static final GenericContainer HAPROXY =
       new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8"))
+          .withCommand("-f", "/etc/haproxy")
           .withNetwork(Network.newNetwork())
           .withNetworkAliases("haproxy")
           .withCopyFileToContainer(
               MountableFile.forClasspathResource("haproxy.cfg"),
-              "/usr/local/etc/haproxy/haproxy.cfg")
+              "/etc/haproxy/haproxy.cfg")
           .withCopyFileToContainer(
               MountableFile.forClasspathResource("dataplaneapi.yml"),
-              "/usr/local/etc/haproxy/dataplaneapi.yml")
+              "/etc/haproxy/dataplaneapi.yml")
           .withCopyFileToContainer(
               MountableFile.forClasspathResource("sharding.map"),
-              "/usr/local/etc/haproxy/sharding.map")
+              "/etc/haproxy/maps/sharding.map")
           .withExposedPorts(8400, 8401, 8404, 5555)
           .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY"));
 }
index 87f707e..0accd07 100644 (file)
@@ -22,7 +22,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 @TestWithResources
 public class HaproxyDataPlaneApiShardingPublisherStrategyTest
 {
-  final static String MAP_PATH = "/usr/local/etc/haproxy/sharding.map";
+  final static String MAP_NAME = "sharding";
   final static String INSTANCE_ID = "backend_3";
   final static int SHARD = 4;
 
@@ -30,8 +30,6 @@ public class HaproxyDataPlaneApiShardingPublisherStrategyTest
   MockWebServer mockHaproxy;
   WebClient webClient;
 
-  @GivenTextResource("de/juplo/kafka/chat/backend/implementation/haproxy/maps__get.json")
-  String maps__get;
   @GivenTextResource("de/juplo/kafka/chat/backend/implementation/haproxy/maps_entries__4__put.json")
   String maps_entries__4__put;
   @GivenTextResource("de/juplo/kafka/chat/backend/implementation/haproxy/maps_entries__4__put__error.json")
@@ -56,54 +54,11 @@ public class HaproxyDataPlaneApiShardingPublisherStrategyTest
   }
 
 
-  @DisplayName("Requests the available maps from HAProxy via the expected path on instanciation")
-  @Test
-  void testRequestsMapsFromHaproxyViaTheExpectedPathOnInstanciation() throws InterruptedException
-  {
-    // Given
-    mockHaproxy.enqueue(new MockResponse()
-        .setStatus("HTTP/1.1 200 OK")
-        .setBody(maps__get)
-        .addHeader("Content-Type", "application/json"));
-
-    // When
-    HaproxyDataPlaneApiShardingPublisherStrategy shardingPublisherStrategy =
-        new HaproxyDataPlaneApiShardingPublisherStrategy(webClient, MAP_PATH, INSTANCE_ID);
-
-    // Then
-    RecordedRequest recordedRequest = mockHaproxy.takeRequest(1l, TimeUnit.SECONDS);
-    assertThat(recordedRequest.getPath())
-        .isEqualTo("/v2/services/haproxy/runtime/maps?include_unmanaged=true");
-  }
-
-  @DisplayName("Detects the expected map-ID on instanciation")
-  @Test
-  void testDetectsExpectedIdForMapOnInstanciation()
-  {
-    // Given
-    mockHaproxy.enqueue(new MockResponse()
-        .setStatus("HTTP/1.1 200 OK")
-        .setBody(maps__get)
-        .addHeader("Content-Type", "application/json"));
-
-    // When
-    HaproxyDataPlaneApiShardingPublisherStrategy shardingPublisherStrategy =
-        new HaproxyDataPlaneApiShardingPublisherStrategy(webClient, MAP_PATH, INSTANCE_ID);
-
-    // Then
-    assertThat(shardingPublisherStrategy.getMapId())
-        .isEqualTo(4);
-  }
-
   @DisplayName("The expected result is yielded on successful publishing")
   @Test
   void testExpectedResultOnSuccessfulPublishing()
   {
     // Given
-    mockHaproxy.enqueue(new MockResponse()
-        .setStatus("HTTP/1.1 200 OK")
-        .setBody(maps__get)
-        .addHeader("Content-Type", "application/json"));
     mockHaproxy.enqueue(new MockResponse()
         .setStatus("HTTP/1.1 200 OK")
         .setBody(maps_entries__4__put)
@@ -111,7 +66,7 @@ public class HaproxyDataPlaneApiShardingPublisherStrategyTest
 
     // When
     HaproxyDataPlaneApiShardingPublisherStrategy shardingPublisherStrategy =
-        new HaproxyDataPlaneApiShardingPublisherStrategy(webClient, MAP_PATH, INSTANCE_ID);
+        new HaproxyDataPlaneApiShardingPublisherStrategy(webClient, MAP_NAME, INSTANCE_ID);
     Mono<String> result = shardingPublisherStrategy.publishOwnership(SHARD);
 
     // Then
@@ -124,10 +79,6 @@ public class HaproxyDataPlaneApiShardingPublisherStrategyTest
   void testExpectedResultOnFailedPublishing()
   {
     // Given
-    mockHaproxy.enqueue(new MockResponse()
-        .setStatus("HTTP/1.1 200 OK")
-        .setBody(maps__get)
-        .addHeader("Content-Type", "application/json"));
     mockHaproxy.enqueue(new MockResponse()
         .setStatus("HTTP/1.1 400 Bad Request")
         .setBody(maps_entries__4__put__error)
@@ -135,7 +86,7 @@ public class HaproxyDataPlaneApiShardingPublisherStrategyTest
 
     // When
     HaproxyDataPlaneApiShardingPublisherStrategy shardingPublisherStrategy =
-        new HaproxyDataPlaneApiShardingPublisherStrategy(webClient, MAP_PATH, INSTANCE_ID);
+        new HaproxyDataPlaneApiShardingPublisherStrategy(webClient, MAP_NAME, INSTANCE_ID);
     Mono<String> result = shardingPublisherStrategy
         .publishOwnership(SHARD)
         .onErrorResume(throwable -> Mono.just(throwable.getMessage()));
index 2dcae5d..0722e36 100644 (file)
@@ -17,7 +17,7 @@ dataplaneapi:
     insecure: true
     password: juplo
 haproxy:
-  config_file: /usr/local/etc/haproxy/haproxy.cfg
+  config_file: /etc/haproxy/haproxy.cfg
   haproxy_bin: /usr/sbin/haproxy
   reload:
     reload_delay: 5
diff --git a/src/test/resources/de/juplo/kafka/chat/backend/implementation/haproxy/maps__get.json b/src/test/resources/de/juplo/kafka/chat/backend/implementation/haproxy/maps__get.json
deleted file mode 100644 (file)
index a02728b..0000000
+++ /dev/null
@@ -1,7 +0,0 @@
-[
-    {
-        "description": "pattern loaded from file '/usr/local/etc/haproxy/sharding.map' used by map at file '/usr/local/etc/haproxy/haproxy.cfg' line 27. curr_ver=0 next_ver=0 entry_cnt=10",
-        "file": "/usr/local/etc/haproxy/sharding.map",
-        "id": "4"
-    }
-]
index 72828a9..3d2741e 100644 (file)
@@ -23,7 +23,7 @@ frontend stats
 frontend frontend
   bind :8400
   default_backend random
-  use_backend %[req.hdr(X-Shard),map(/usr/local/etc/haproxy/sharding.map)]
+  use_backend %[req.hdr(X-Shard),map(/etc/haproxy/maps/sharding.map)]
 
 backend random
   server b1 backend-1:8080 check
@@ -40,5 +40,5 @@ backend backend_3
   server b3 backend-3:8080 check
 
 program api
-  command /usr/bin/dataplaneapi -f /usr/local/etc/haproxy/dataplaneapi.yml
+  command /usr/bin/dataplaneapi -f /etc/haproxy/dataplaneapi.yml
   no option start-on-reload