From: Kai Moritz Date: Wed, 20 Mar 2024 21:02:09 +0000 (+0100) Subject: fix: GREEN - Dynamic changes to the sharding-map are synced back to disk X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=156cc8e68f0c98e77e944e7db850ab98071c8796;p=demos%2Fkafka%2Fchat fix: GREEN - Dynamic changes to the sharding-map are synced back to disk * HAProxy distinguishes managed and unmanaged maps. ** Only Maps, that are loaded from the directory `maps`, that lives in the config-directory of HAProxy, are treated as managed. ** Only managed maps are listed by default by the Data Plane API. ** And last but not least, only managed maps _can be synced back to disk_, which makes dynamically changes persistent, so that they can survive a restart or reload of HAProxy. * And only managed maps can as well be referenced by their _name_, as by their id. * The name of managed maps is the name of the file, _without_ the path and without the suffix `.map`. * What makes it more difficulte in the case of the HAProxy running inside a docker-container is, that this mechanism only seems to work, if the config-directory is identical to the default `/etc/haproxy`, which _is not_ the case in the default-configuration of the used official image. * Hence, changing the config-directory to the default (in this case) enables managed maps, which is crucial for the setup of the Handover-IT, whicht has to reload the HAProxy-process, in order to let it detect newly added backend-instances, that are otherwise ignored, because the name- resolution inside the docker-network only works _after_ an instance is started -- _without this change_, the dynamically made changes to the sharding-map are lost during the reload, that is performed shortly afterwards. * The refined configuration for HAProxy enables the following refinements of the `HaproxyDataPlaneApiShardingPublisherStrategy`: ** Because the sharding-map is managed now, applied changes can be *synced back to disk* forcibly, which is not possible for unmanaged maps. ** That means: *Dynamically made changes persist a reload/restart!* ** Also, because the sharding-map is managed now, it can be simply referenced by its name `sharding`. ** Therefore, the `HaproxyDataPlaneApiShardingPublisherStrategyIT` does no longer have to detect the ID of the map to be able to reference it. --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index ba794523..1d115182 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -48,7 +48,7 @@ public class ChatBackendProperties private int numPartitions = 2; private Duration pollingInterval = Duration.ofSeconds(1); private String haproxyRuntimeApi = "haproxy:8401"; - private String haproxyMap = "/usr/local/etc/haproxy/sharding.map"; + private String haproxyMap = "sharding"; } @Getter diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyDataPlaneApiShardingPublisherStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyDataPlaneApiShardingPublisherStrategy.java index a4270a74..5247e57c 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyDataPlaneApiShardingPublisherStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyDataPlaneApiShardingPublisherStrategy.java @@ -1,7 +1,7 @@ package de.juplo.kafka.chat.backend.implementation.haproxy; import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; -import lombok.Getter; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatusCode; @@ -10,57 +10,23 @@ import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Mono; -import java.time.Duration; import java.util.Map; +@RequiredArgsConstructor @Slf4j public class HaproxyDataPlaneApiShardingPublisherStrategy implements ShardingPublisherStrategy { - public final static String MAPS_PATH = "/services/haproxy/runtime/maps"; - public final static String INCLUDE_UNMANAGED_PARAM = "include_unmanaged"; public final static String MAP_ENTRY_PATH = "/services/haproxy/runtime/maps_entries/{key}"; public final static String MAP_PARAM = "map"; + public final static String FORCE_SYNC_PARAM = "force_sync"; private final WebClient webClient; - @Getter - private final int mapId; + private final String mapName; private final String instanceId; - public HaproxyDataPlaneApiShardingPublisherStrategy( - WebClient webClient, - String mapPath, - String instanceId) - { - this.webClient = webClient; - this.mapId = webClient - .get() - .uri(uriBuilder -> uriBuilder - .path(MAPS_PATH) - .queryParam(INCLUDE_UNMANAGED_PARAM, Boolean.TRUE) - .build()) - .accept(MediaType.APPLICATION_JSON) - .exchangeToFlux(response -> - { - if (response.statusCode().equals(HttpStatus.OK)) - { - return response.bodyToFlux(MapInfoTo.class); - } - else - { - return response.createError().flux(); - } - }) - .filter(map -> map.file().trim().equals(mapPath)) - .map(map -> map.id()) - .map(id -> Integer.valueOf(id)) - .blockFirst(Duration.ofSeconds(1)); - this.instanceId = instanceId; - } - - @Override public Mono publishOwnership(int shard) { @@ -68,7 +34,8 @@ public class HaproxyDataPlaneApiShardingPublisherStrategy implements ShardingPub .put() .uri(uriBuilder -> uriBuilder .path(MAP_ENTRY_PATH) - .queryParam(MAP_PARAM, "#" + mapId) + .queryParam(MAP_PARAM, mapName) + .queryParam(FORCE_SYNC_PARAM, 1) .build(Map.of("key", shard))) .contentType(MediaType.APPLICATION_JSON) .bodyValue(new MapEntryTo(Integer.toString(shard), instanceId)) diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyDataPlaneApiShardingPublisherStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyDataPlaneApiShardingPublisherStrategyIT.java index 41ed2dea..5f49fd5f 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyDataPlaneApiShardingPublisherStrategyIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyDataPlaneApiShardingPublisherStrategyIT.java @@ -57,8 +57,8 @@ public class HaproxyDataPlaneApiShardingPublisherStrategyIT .get() .uri(uriBuilder -> uriBuilder .path(MAP_ENTRY_PATH) - .queryParam(MAP_PARAM, "#" + shardingPublisherStrategy.getMapId()) - .build(SHARD)) + .queryParam(MAP_PARAM, MAP_NAME) + .build(key)) .accept(MediaType.APPLICATION_JSON) .exchangeToMono(response -> { @@ -91,29 +91,30 @@ public class HaproxyDataPlaneApiShardingPublisherStrategyIT shardingPublisherStrategy = new HaproxyDataPlaneApiShardingPublisherStrategy( webClient, - MAP_PATH, + MAP_NAME, INSTANCE_ID); } - static final String MAP_PATH = "/usr/local/etc/haproxy/sharding.map"; + static final String MAP_NAME = "sharding"; static final String INSTANCE_ID = "foo"; static final int SHARD = 6; @Container static final GenericContainer HAPROXY = new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8")) + .withCommand("-f", "/etc/haproxy") .withNetwork(Network.newNetwork()) .withNetworkAliases("haproxy") .withCopyFileToContainer( MountableFile.forClasspathResource("haproxy.cfg"), - "/usr/local/etc/haproxy/haproxy.cfg") + "/etc/haproxy/haproxy.cfg") .withCopyFileToContainer( MountableFile.forClasspathResource("dataplaneapi.yml"), - "/usr/local/etc/haproxy/dataplaneapi.yml") + "/etc/haproxy/dataplaneapi.yml") .withCopyFileToContainer( MountableFile.forClasspathResource("sharding.map"), - "/usr/local/etc/haproxy/sharding.map") + "/etc/haproxy/maps/sharding.map") .withExposedPorts(8400, 8401, 8404, 5555) .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY")); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyDataPlaneApiShardingPublisherStrategyTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyDataPlaneApiShardingPublisherStrategyTest.java index 87f707e5..0accd07c 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyDataPlaneApiShardingPublisherStrategyTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyDataPlaneApiShardingPublisherStrategyTest.java @@ -22,7 +22,7 @@ import static org.assertj.core.api.Assertions.assertThat; @TestWithResources public class HaproxyDataPlaneApiShardingPublisherStrategyTest { - final static String MAP_PATH = "/usr/local/etc/haproxy/sharding.map"; + final static String MAP_NAME = "sharding"; final static String INSTANCE_ID = "backend_3"; final static int SHARD = 4; @@ -30,8 +30,6 @@ public class HaproxyDataPlaneApiShardingPublisherStrategyTest MockWebServer mockHaproxy; WebClient webClient; - @GivenTextResource("de/juplo/kafka/chat/backend/implementation/haproxy/maps__get.json") - String maps__get; @GivenTextResource("de/juplo/kafka/chat/backend/implementation/haproxy/maps_entries__4__put.json") String maps_entries__4__put; @GivenTextResource("de/juplo/kafka/chat/backend/implementation/haproxy/maps_entries__4__put__error.json") @@ -56,54 +54,11 @@ public class HaproxyDataPlaneApiShardingPublisherStrategyTest } - @DisplayName("Requests the available maps from HAProxy via the expected path on instanciation") - @Test - void testRequestsMapsFromHaproxyViaTheExpectedPathOnInstanciation() throws InterruptedException - { - // Given - mockHaproxy.enqueue(new MockResponse() - .setStatus("HTTP/1.1 200 OK") - .setBody(maps__get) - .addHeader("Content-Type", "application/json")); - - // When - HaproxyDataPlaneApiShardingPublisherStrategy shardingPublisherStrategy = - new HaproxyDataPlaneApiShardingPublisherStrategy(webClient, MAP_PATH, INSTANCE_ID); - - // Then - RecordedRequest recordedRequest = mockHaproxy.takeRequest(1l, TimeUnit.SECONDS); - assertThat(recordedRequest.getPath()) - .isEqualTo("/v2/services/haproxy/runtime/maps?include_unmanaged=true"); - } - - @DisplayName("Detects the expected map-ID on instanciation") - @Test - void testDetectsExpectedIdForMapOnInstanciation() - { - // Given - mockHaproxy.enqueue(new MockResponse() - .setStatus("HTTP/1.1 200 OK") - .setBody(maps__get) - .addHeader("Content-Type", "application/json")); - - // When - HaproxyDataPlaneApiShardingPublisherStrategy shardingPublisherStrategy = - new HaproxyDataPlaneApiShardingPublisherStrategy(webClient, MAP_PATH, INSTANCE_ID); - - // Then - assertThat(shardingPublisherStrategy.getMapId()) - .isEqualTo(4); - } - @DisplayName("The expected result is yielded on successful publishing") @Test void testExpectedResultOnSuccessfulPublishing() { // Given - mockHaproxy.enqueue(new MockResponse() - .setStatus("HTTP/1.1 200 OK") - .setBody(maps__get) - .addHeader("Content-Type", "application/json")); mockHaproxy.enqueue(new MockResponse() .setStatus("HTTP/1.1 200 OK") .setBody(maps_entries__4__put) @@ -111,7 +66,7 @@ public class HaproxyDataPlaneApiShardingPublisherStrategyTest // When HaproxyDataPlaneApiShardingPublisherStrategy shardingPublisherStrategy = - new HaproxyDataPlaneApiShardingPublisherStrategy(webClient, MAP_PATH, INSTANCE_ID); + new HaproxyDataPlaneApiShardingPublisherStrategy(webClient, MAP_NAME, INSTANCE_ID); Mono result = shardingPublisherStrategy.publishOwnership(SHARD); // Then @@ -124,10 +79,6 @@ public class HaproxyDataPlaneApiShardingPublisherStrategyTest void testExpectedResultOnFailedPublishing() { // Given - mockHaproxy.enqueue(new MockResponse() - .setStatus("HTTP/1.1 200 OK") - .setBody(maps__get) - .addHeader("Content-Type", "application/json")); mockHaproxy.enqueue(new MockResponse() .setStatus("HTTP/1.1 400 Bad Request") .setBody(maps_entries__4__put__error) @@ -135,7 +86,7 @@ public class HaproxyDataPlaneApiShardingPublisherStrategyTest // When HaproxyDataPlaneApiShardingPublisherStrategy shardingPublisherStrategy = - new HaproxyDataPlaneApiShardingPublisherStrategy(webClient, MAP_PATH, INSTANCE_ID); + new HaproxyDataPlaneApiShardingPublisherStrategy(webClient, MAP_NAME, INSTANCE_ID); Mono result = shardingPublisherStrategy .publishOwnership(SHARD) .onErrorResume(throwable -> Mono.just(throwable.getMessage())); diff --git a/src/test/resources/dataplaneapi.yml b/src/test/resources/dataplaneapi.yml index 2dcae5d7..0722e36b 100644 --- a/src/test/resources/dataplaneapi.yml +++ b/src/test/resources/dataplaneapi.yml @@ -17,7 +17,7 @@ dataplaneapi: insecure: true password: juplo haproxy: - config_file: /usr/local/etc/haproxy/haproxy.cfg + config_file: /etc/haproxy/haproxy.cfg haproxy_bin: /usr/sbin/haproxy reload: reload_delay: 5 diff --git a/src/test/resources/de/juplo/kafka/chat/backend/implementation/haproxy/maps__get.json b/src/test/resources/de/juplo/kafka/chat/backend/implementation/haproxy/maps__get.json deleted file mode 100644 index a02728bd..00000000 --- a/src/test/resources/de/juplo/kafka/chat/backend/implementation/haproxy/maps__get.json +++ /dev/null @@ -1,7 +0,0 @@ -[ - { - "description": "pattern loaded from file '/usr/local/etc/haproxy/sharding.map' used by map at file '/usr/local/etc/haproxy/haproxy.cfg' line 27. curr_ver=0 next_ver=0 entry_cnt=10", - "file": "/usr/local/etc/haproxy/sharding.map", - "id": "4" - } -] diff --git a/src/test/resources/haproxy.cfg b/src/test/resources/haproxy.cfg index 72828a94..3d2741e5 100644 --- a/src/test/resources/haproxy.cfg +++ b/src/test/resources/haproxy.cfg @@ -23,7 +23,7 @@ frontend stats frontend frontend bind :8400 default_backend random - use_backend %[req.hdr(X-Shard),map(/usr/local/etc/haproxy/sharding.map)] + use_backend %[req.hdr(X-Shard),map(/etc/haproxy/maps/sharding.map)] backend random server b1 backend-1:8080 check @@ -40,5 +40,5 @@ backend backend_3 server b3 backend-3:8080 check program api - command /usr/bin/dataplaneapi -f /usr/local/etc/haproxy/dataplaneapi.yml + command /usr/bin/dataplaneapi -f /etc/haproxy/dataplaneapi.yml no option start-on-reload