* HAProxy distinguishes managed and unmanaged maps.
** Only Maps, that are loaded from the directory `maps`, that lives in the
config-directory of HAProxy, are treated as managed.
** Only managed maps are listed by default by the Data Plane API.
** And last but not least, only managed maps _can be synced back to disk_,
which makes dynamically changes persistent, so that they can survive
a restart or reload of HAProxy.
* And only managed maps can as well be referenced by their _name_, as by
their id.
* The name of managed maps is the name of the file, _without_ the path and
without the suffix `.map`.
* What makes it more difficulte in the case of the HAProxy running inside
a docker-container is, that this mechanism only seems to work, if the
config-directory is identical to the default `/etc/haproxy`, which _is
not_ the case in the default-configuration of the used official image.
* Hence, changing the config-directory to the default (in this case)
enables managed maps, which is crucial for the setup of the Handover-IT,
whicht has to reload the HAProxy-process, in order to let it detect newly
added backend-instances, that are otherwise ignored, because the name-
resolution inside the docker-network only works _after_ an instance is
started -- _without this change_, the dynamically made changes to the
sharding-map are lost during the reload, that is performed shortly
afterwards.
* The refined configuration for HAProxy enables the following refinements
of the `HaproxyDataPlaneApiShardingPublisherStrategy`:
** Because the sharding-map is managed now, applied changes can be *synced
back to disk* forcibly, which is not possible for unmanaged maps.
** That means: *Dynamically made changes persist a reload/restart!*
** Also, because the sharding-map is managed now, it can be simply
referenced by its name `sharding`.
** Therefore, the `HaproxyDataPlaneApiShardingPublisherStrategyIT` does
no longer have to detect the ID of the map to be able to reference it.
private int numPartitions = 2;
private Duration pollingInterval = Duration.ofSeconds(1);
private String haproxyRuntimeApi = "haproxy:8401";
- private String haproxyMap = "/usr/local/etc/haproxy/sharding.map";
+ private String haproxyMap = "sharding";
}
@Getter
package de.juplo.kafka.chat.backend.implementation.haproxy;
import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
-import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.HttpStatusCode;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Mono;
-import java.time.Duration;
import java.util.Map;
+@RequiredArgsConstructor
@Slf4j
public class HaproxyDataPlaneApiShardingPublisherStrategy implements ShardingPublisherStrategy
{
- public final static String MAPS_PATH = "/services/haproxy/runtime/maps";
- public final static String INCLUDE_UNMANAGED_PARAM = "include_unmanaged";
public final static String MAP_ENTRY_PATH = "/services/haproxy/runtime/maps_entries/{key}";
public final static String MAP_PARAM = "map";
+ public final static String FORCE_SYNC_PARAM = "force_sync";
private final WebClient webClient;
- @Getter
- private final int mapId;
+ private final String mapName;
private final String instanceId;
- public HaproxyDataPlaneApiShardingPublisherStrategy(
- WebClient webClient,
- String mapPath,
- String instanceId)
- {
- this.webClient = webClient;
- this.mapId = webClient
- .get()
- .uri(uriBuilder -> uriBuilder
- .path(MAPS_PATH)
- .queryParam(INCLUDE_UNMANAGED_PARAM, Boolean.TRUE)
- .build())
- .accept(MediaType.APPLICATION_JSON)
- .exchangeToFlux(response ->
- {
- if (response.statusCode().equals(HttpStatus.OK))
- {
- return response.bodyToFlux(MapInfoTo.class);
- }
- else
- {
- return response.<MapInfoTo>createError().flux();
- }
- })
- .filter(map -> map.file().trim().equals(mapPath))
- .map(map -> map.id())
- .map(id -> Integer.valueOf(id))
- .blockFirst(Duration.ofSeconds(1));
- this.instanceId = instanceId;
- }
-
-
@Override
public Mono<String> publishOwnership(int shard)
{
.put()
.uri(uriBuilder -> uriBuilder
.path(MAP_ENTRY_PATH)
- .queryParam(MAP_PARAM, "#" + mapId)
+ .queryParam(MAP_PARAM, mapName)
+ .queryParam(FORCE_SYNC_PARAM, 1)
.build(Map.of("key", shard)))
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(new MapEntryTo(Integer.toString(shard), instanceId))
.get()
.uri(uriBuilder -> uriBuilder
.path(MAP_ENTRY_PATH)
- .queryParam(MAP_PARAM, "#" + shardingPublisherStrategy.getMapId())
- .build(SHARD))
+ .queryParam(MAP_PARAM, MAP_NAME)
+ .build(key))
.accept(MediaType.APPLICATION_JSON)
.exchangeToMono(response ->
{
shardingPublisherStrategy = new HaproxyDataPlaneApiShardingPublisherStrategy(
webClient,
- MAP_PATH,
+ MAP_NAME,
INSTANCE_ID);
}
- static final String MAP_PATH = "/usr/local/etc/haproxy/sharding.map";
+ static final String MAP_NAME = "sharding";
static final String INSTANCE_ID = "foo";
static final int SHARD = 6;
@Container
static final GenericContainer HAPROXY =
new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8"))
+ .withCommand("-f", "/etc/haproxy")
.withNetwork(Network.newNetwork())
.withNetworkAliases("haproxy")
.withCopyFileToContainer(
MountableFile.forClasspathResource("haproxy.cfg"),
- "/usr/local/etc/haproxy/haproxy.cfg")
+ "/etc/haproxy/haproxy.cfg")
.withCopyFileToContainer(
MountableFile.forClasspathResource("dataplaneapi.yml"),
- "/usr/local/etc/haproxy/dataplaneapi.yml")
+ "/etc/haproxy/dataplaneapi.yml")
.withCopyFileToContainer(
MountableFile.forClasspathResource("sharding.map"),
- "/usr/local/etc/haproxy/sharding.map")
+ "/etc/haproxy/maps/sharding.map")
.withExposedPorts(8400, 8401, 8404, 5555)
.withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY"));
}
@TestWithResources
public class HaproxyDataPlaneApiShardingPublisherStrategyTest
{
- final static String MAP_PATH = "/usr/local/etc/haproxy/sharding.map";
+ final static String MAP_NAME = "sharding";
final static String INSTANCE_ID = "backend_3";
final static int SHARD = 4;
MockWebServer mockHaproxy;
WebClient webClient;
- @GivenTextResource("de/juplo/kafka/chat/backend/implementation/haproxy/maps__get.json")
- String maps__get;
@GivenTextResource("de/juplo/kafka/chat/backend/implementation/haproxy/maps_entries__4__put.json")
String maps_entries__4__put;
@GivenTextResource("de/juplo/kafka/chat/backend/implementation/haproxy/maps_entries__4__put__error.json")
}
- @DisplayName("Requests the available maps from HAProxy via the expected path on instanciation")
- @Test
- void testRequestsMapsFromHaproxyViaTheExpectedPathOnInstanciation() throws InterruptedException
- {
- // Given
- mockHaproxy.enqueue(new MockResponse()
- .setStatus("HTTP/1.1 200 OK")
- .setBody(maps__get)
- .addHeader("Content-Type", "application/json"));
-
- // When
- HaproxyDataPlaneApiShardingPublisherStrategy shardingPublisherStrategy =
- new HaproxyDataPlaneApiShardingPublisherStrategy(webClient, MAP_PATH, INSTANCE_ID);
-
- // Then
- RecordedRequest recordedRequest = mockHaproxy.takeRequest(1l, TimeUnit.SECONDS);
- assertThat(recordedRequest.getPath())
- .isEqualTo("/v2/services/haproxy/runtime/maps?include_unmanaged=true");
- }
-
- @DisplayName("Detects the expected map-ID on instanciation")
- @Test
- void testDetectsExpectedIdForMapOnInstanciation()
- {
- // Given
- mockHaproxy.enqueue(new MockResponse()
- .setStatus("HTTP/1.1 200 OK")
- .setBody(maps__get)
- .addHeader("Content-Type", "application/json"));
-
- // When
- HaproxyDataPlaneApiShardingPublisherStrategy shardingPublisherStrategy =
- new HaproxyDataPlaneApiShardingPublisherStrategy(webClient, MAP_PATH, INSTANCE_ID);
-
- // Then
- assertThat(shardingPublisherStrategy.getMapId())
- .isEqualTo(4);
- }
-
@DisplayName("The expected result is yielded on successful publishing")
@Test
void testExpectedResultOnSuccessfulPublishing()
{
// Given
- mockHaproxy.enqueue(new MockResponse()
- .setStatus("HTTP/1.1 200 OK")
- .setBody(maps__get)
- .addHeader("Content-Type", "application/json"));
mockHaproxy.enqueue(new MockResponse()
.setStatus("HTTP/1.1 200 OK")
.setBody(maps_entries__4__put)
// When
HaproxyDataPlaneApiShardingPublisherStrategy shardingPublisherStrategy =
- new HaproxyDataPlaneApiShardingPublisherStrategy(webClient, MAP_PATH, INSTANCE_ID);
+ new HaproxyDataPlaneApiShardingPublisherStrategy(webClient, MAP_NAME, INSTANCE_ID);
Mono<String> result = shardingPublisherStrategy.publishOwnership(SHARD);
// Then
void testExpectedResultOnFailedPublishing()
{
// Given
- mockHaproxy.enqueue(new MockResponse()
- .setStatus("HTTP/1.1 200 OK")
- .setBody(maps__get)
- .addHeader("Content-Type", "application/json"));
mockHaproxy.enqueue(new MockResponse()
.setStatus("HTTP/1.1 400 Bad Request")
.setBody(maps_entries__4__put__error)
// When
HaproxyDataPlaneApiShardingPublisherStrategy shardingPublisherStrategy =
- new HaproxyDataPlaneApiShardingPublisherStrategy(webClient, MAP_PATH, INSTANCE_ID);
+ new HaproxyDataPlaneApiShardingPublisherStrategy(webClient, MAP_NAME, INSTANCE_ID);
Mono<String> result = shardingPublisherStrategy
.publishOwnership(SHARD)
.onErrorResume(throwable -> Mono.just(throwable.getMessage()));
insecure: true
password: juplo
haproxy:
- config_file: /usr/local/etc/haproxy/haproxy.cfg
+ config_file: /etc/haproxy/haproxy.cfg
haproxy_bin: /usr/sbin/haproxy
reload:
reload_delay: 5
+++ /dev/null
-[
- {
- "description": "pattern loaded from file '/usr/local/etc/haproxy/sharding.map' used by map at file '/usr/local/etc/haproxy/haproxy.cfg' line 27. curr_ver=0 next_ver=0 entry_cnt=10",
- "file": "/usr/local/etc/haproxy/sharding.map",
- "id": "4"
- }
-]
frontend frontend
bind :8400
default_backend random
- use_backend %[req.hdr(X-Shard),map(/usr/local/etc/haproxy/sharding.map)]
+ use_backend %[req.hdr(X-Shard),map(/etc/haproxy/maps/sharding.map)]
backend random
server b1 backend-1:8080 check
server b3 backend-3:8080 check
program api
- command /usr/bin/dataplaneapi -f /usr/local/etc/haproxy/dataplaneapi.yml
+ command /usr/bin/dataplaneapi -f /etc/haproxy/dataplaneapi.yml
no option start-on-reload