+++ /dev/null
-package de.juplo.kafka.chat.backend;
-
-import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
-import de.juplo.kafka.chat.backend.api.MessageTo;
-import lombok.extern.slf4j.Slf4j;
-import org.awaitility.Awaitility;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.MediaType;
-import org.springframework.web.reactive.function.client.WebClient;
-import pl.rzrz.assertj.reactor.Assertions;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.time.Duration;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-
-@Slf4j
-public abstract class AbstractHandoverIT
-{
- static final int NUM_CHATROOMS = 23;
- static final int NUM_CLIENTS = 17;
-
-
- private final AbstractHandoverITContainers containers;
-
-
- AbstractHandoverIT(AbstractHandoverITContainers containers)
- {
- this.containers = containers;
- }
-
-
- @Test
- void test() throws InterruptedException
- {
- log.info("Starting backend-1...");
- containers.startBackend(containers.backend1, new TestWriter[0]);
- log.info("backend-1 started!");
-
- ChatRoomInfoTo[] chatRooms = Flux
- .range(0, NUM_CHATROOMS)
- .flatMap(i -> createChatRoom("room-" + i))
- .toStream()
- .toArray(size -> new ChatRoomInfoTo[size]);
-
- int port = containers.haproxy.getMappedPort(8400);
-
- CompletableFuture<Void>[] testWriterFutures = new CompletableFuture[NUM_CLIENTS];
- TestWriter[] testWriters = new TestWriter[NUM_CLIENTS];
- for (int i = 0; i < NUM_CLIENTS; i++)
- {
- TestWriter testWriter = new TestWriter(
- port,
- chatRooms[i % NUM_CHATROOMS],
- "user-" + i);
- testWriters[i] = testWriter;
- testWriterFutures[i] = testWriter
- .run()
- .toFuture();
- }
-
- TestListener testListener = new TestListener(port, chatRooms);
- testListener
- .run()
- .subscribe(message -> log.info(
- "Received message: {}",
- message));
-
- log.info("Starting backend-2...");
- containers.startBackend(containers.backend2, testWriters);
- log.info("backend-2 started!");
-
- log.info("Starting backend-3...");
- containers.startBackend(containers.backend3, testWriters);
- log.info("backend-3 started!");
-
- for (int i = 0; i < NUM_CLIENTS; i++)
- {
- testWriters[i].running = false;
- testWriterFutures[i].join();
- log.info("Joined TestWriter {}", testWriters[i].user);
- }
-
- Awaitility
- .await()
- .atMost(Duration.ofSeconds(5))
- .untilAsserted(() -> assertAllSentMessagesReceived(testWriters, testListener));
- }
-
- private void assertAllSentMessagesReceived(
- TestWriter[] testWriters,
- TestListener testListener)
- {
- for (int i = 0; i < NUM_CLIENTS; i++)
- {
- TestWriter testWriter = testWriters[i];
- ChatRoomInfoTo chatRoom = testWriter.chatRoom;
- List<MessageTo> receivedMessages = testListener.receivedMessages.get(chatRoom.getId());
-
- Assertions.assertThat(receivedMessages
- .stream()
- .filter(message -> message.getUser().equals(testWriter.user.getName()))
- ).containsExactlyElementsOf(testWriter.sentMessages);
- }
- }
-
- Mono<ChatRoomInfoTo> createChatRoom(String name)
- {
- return webClient
- .post()
- .uri("/create")
- .contentType(MediaType.TEXT_PLAIN)
- .bodyValue(name)
- .accept(MediaType.APPLICATION_JSON)
- .exchangeToMono(response ->
- {
- if (response.statusCode().equals(HttpStatus.OK))
- {
- return response.bodyToMono(ChatRoomInfoTo.class);
- }
- else
- {
- return response.createError();
- }
- });
- }
-
-
- WebClient webClient;
-
- @BeforeEach
- void setUp() throws Exception
- {
- containers.setUp();
-
- Integer port = containers.haproxy.getMappedPort(8400);
- webClient = WebClient.create("http://localhost:" + port);
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend;
-
-import de.juplo.kafka.chat.backend.implementation.haproxy.MapEntryTo;
-import de.juplo.kafka.chat.backend.implementation.haproxy.MapInfoTo;
-import lombok.extern.slf4j.Slf4j;
-import org.awaitility.Awaitility;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.HttpStatusCode;
-import org.springframework.http.MediaType;
-import org.springframework.web.reactive.function.client.WebClient;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.Network;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.containers.wait.strategy.Wait;
-import org.testcontainers.images.ImagePullPolicy;
-import org.testcontainers.utility.DockerImageName;
-import org.testcontainers.utility.MountableFile;
-import reactor.core.publisher.Mono;
-import reactor.util.retry.Retry;
-
-import java.time.Duration;
-import java.time.Instant;
-import java.util.Arrays;
-
-import static de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyDataPlaneApiShardingPublisherStrategy.MAP_PARAM;
-
-
-@Slf4j
-public abstract class AbstractHandoverITContainers
-{
- static final ImagePullPolicy NEVER_PULL = imageName -> false;
-
-
- final Network network = Network.newNetwork();
- final GenericContainer haproxy, backend1, backend2, backend3;
-
-
- AbstractHandoverITContainers()
- {
- haproxy = createHaproxyContainer();
- haproxy.start();
-
- backend1 = createBackendContainer("1");
- backend2 = createBackendContainer("2");
- backend3 = createBackendContainer("3");
- }
-
-
- void setUpExtra() throws Exception
- {
- log.info("This setup does not need any extra containers");
- }
-
- void setUp() throws Exception
- {
- setUpExtra();
- }
-
- void startBackend(
- GenericContainer backend,
- TestWriter[] testWriters)
- {
- backend.start();
-
- int[] numSentMessages = Arrays
- .stream(testWriters)
- .mapToInt(testWriter -> testWriter.getNumSentMessages())
- .toArray();
-
- String backendUri = "http://localhost:" + backend.getMappedPort(8080);
-
- Instant before, after;
-
- before = Instant.now();
- HttpStatusCode statusCode = WebClient
- .create(backendUri)
- .get()
- .uri("/actuator/health")
- .exchangeToMono(response ->
- {
- log.info("{} responded with {}", backendUri, response.statusCode());
- return Mono.just(response.statusCode());
- })
- .flatMap(status -> switch (status.value())
- {
- case 200, 503 -> Mono.just(status);
- default -> Mono.error(new RuntimeException(status.toString()));
- })
- .retryWhen(Retry.backoff(30, Duration.ofSeconds(1)))
- .block();
- after = Instant.now();
- log.info("Took {} to reach status {}", Duration.between(before, after), statusCode);
-
- before = Instant.now();
- Awaitility
- .await()
- .atMost(Duration.ofSeconds(45))
- .until(() -> WebClient
- .create(backendUri)
- .get()
- .uri("/actuator/health")
- .exchangeToMono(response ->
- {
- log.info("{} responded with {}", backendUri, response.statusCode());
- if (response.statusCode().equals(HttpStatus.OK))
- {
- return response
- .bodyToMono(StatusTo.class)
- .map(StatusTo::getStatus)
- .map(status -> status.equalsIgnoreCase("UP"));
- }
- else
- {
- return Mono.just(false);
- }
- })
- .block());
- after = Instant.now();
- log.info("Took {} until the backend reported status UP", Duration.between(before, after));
-
- haproxy
- .getDockerClient()
- .killContainerCmd(haproxy.getContainerId())
- .withSignal("HUP")
- .exec();
-
- before = Instant.now();
- Awaitility
- .await()
- .atMost(Duration.ofSeconds(15))
- .until(() -> WebClient
- .create("http://localhost:" + haproxy.getMappedPort(8400))
- .get()
- .uri("/actuator/health")
- .exchangeToMono(response ->
- {
- log.info("{} responded with {}", backendUri, response.statusCode());
- if (response.statusCode().equals(HttpStatus.OK))
- {
- return response
- .bodyToMono(StatusTo.class)
- .map(StatusTo::getStatus)
- .map(status -> status.equalsIgnoreCase("UP"));
- }
- else
- {
- return Mono.just(false);
- }
- })
- .block());
- after = Instant.now();
- log.info("Took {} until haproxy reported status UP", Duration.between(before, after));
-
- before = Instant.now();
- Awaitility
- .await()
- .atMost(Duration.ofSeconds(30))
- .until(() ->
- {
- for (int i = 0; i < testWriters.length; i++)
- {
- TestWriter testWriter = testWriters[i];
- int sentTotal = testWriter.getNumSentMessages();
- if (sentTotal == numSentMessages[i])
- {
- log.info(
- "No progress for {}: sent-before={}, sent-total={}, map: {}",
- testWriter,
- numSentMessages[i],
- sentTotal,
- readHaproxyMap());
- return false;
- }
- }
-
- return true;
- });
- after = Instant.now();
- log.info("Took {} until all writers made some progress", Duration.between(before, after));
- }
-
- abstract String[] getBackendCommand();
-
- final GenericContainer createHaproxyContainer()
- {
- return new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8"))
- .withCommand("-f", "/etc/haproxy")
- .withNetwork(network)
- .withNetworkAliases("haproxy")
- .withCopyFileToContainer(
- MountableFile.forClasspathResource("haproxy.cfg"),
- "/etc/haproxy/haproxy.cfg")
- .withCopyFileToContainer(
- MountableFile.forClasspathResource("dataplaneapi.yml"),
- "/etc/haproxy/dataplaneapi.yml")
- .withCopyFileToContainer(
- MountableFile.forClasspathResource("sharding.map"),
- "/etc/haproxy/maps/sharding.map")
- .withExposedPorts(8400, 8401, 8404, 5555)
- .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY"));
- }
-
- final GenericContainer createBackendContainer(String id)
- {
- return new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
- .withImagePullPolicy(NEVER_PULL)
- .withNetwork(network)
- .withNetworkAliases("backend-ID".replaceAll("ID", id))
- .withCommand(Arrays.stream(getBackendCommand())
- .map(commandPart -> commandPart.replaceAll("ID", id))
- .toArray(size -> new String[size]))
- .withExposedPorts(8080)
- .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
- .withLogConsumer(new Slf4jLogConsumer(
- log,
- true
- )
- .withPrefix("BACKEND-ID".replaceAll("ID", id)));
- }
-
- private String readHaproxyMap()
- {
- return createHaproxyWebClient()
- .get()
- .uri(uriBuilder -> uriBuilder
- .path("/services/haproxy/runtime/maps_entries")
- .queryParam(MAP_PARAM, MAP_NAME)
- .build())
- .accept(MediaType.APPLICATION_JSON)
- .exchangeToFlux(response ->
- {
- if (response.statusCode().equals(HttpStatus.OK))
- {
- return response.bodyToFlux(MapEntryTo.class);
- }
- else
- {
- return response.<MapEntryTo>createError().flux();
- }
- })
- .map(entry -> entry.key() + "=" + entry.value())
- .reduce((a, b) -> a + ", " + b)
- .block();
- }
-
- private WebClient createHaproxyWebClient()
- {
- return WebClient
- .builder()
- .baseUrl("http://localhost:" + haproxy.getMappedPort(5555) + "/v2/")
- .defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth("juplo", "juplo"))
- .build();
- }
-
- static final String MAP_NAME = "sharding";
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend;
-
-import lombok.extern.slf4j.Slf4j;
-
-
-@Slf4j
-class KafkaHandoverIT extends AbstractHandoverIT
-{
- KafkaHandoverIT()
- {
- super(new KafkaHandoverITContainers());
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend;
-
-import lombok.extern.slf4j.Slf4j;
-import org.testcontainers.containers.Container;
-import org.testcontainers.containers.KafkaContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.containers.wait.strategy.Wait;
-import org.testcontainers.utility.DockerImageName;
-
-import java.io.IOException;
-
-
-@Slf4j
-class KafkaHandoverITContainers extends AbstractHandoverITContainers
-{
- @Override
- void setUpExtra() throws IOException, InterruptedException
- {
- kafka.start();
-
- Container.ExecResult result;
- result = kafka.execInContainer(
- "kafka-topics",
- "--bootstrap-server",
- "kafka:9999",
- "--create",
- "--topic",
- "info_channel",
- "--partitions",
- "3");
- log.info(
- "EXIT-CODE={}, STDOUT={}, STDERR={}",
- result.getExitCode(),
- result.getStdout(),
- result.getStdout());
- result = kafka.execInContainer(
- "kafka-topics",
- "--bootstrap-server",
- "kafka:9999",
- "--create",
- "--topic",
- "data_channel",
- "--partitions",
- "10");
- log.info(
- "EXIT-CODE={}, STDOUT={}, STDERR={}",
- result.getExitCode(),
- result.getStdout(),
- result.getStdout());
- }
-
-
- private final KafkaContainer kafka =
- new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
- .withNetwork(network)
- .withNetworkAliases("kafka")
- .withListener(() -> "kafka:9999")
- .withKraft()
- .waitingFor(Wait.forLogMessage(".*Kafka\\ Server\\ started.*\\n", 1))
- .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("KAFKA"));
-
-
- @Override
- String[] getBackendCommand()
- {
- return new String[]
- {
- "--chat.backend.instance-id=backend_ID",
- "--chat.backend.services=kafka",
- "--chat.backend.kafka.bootstrap-servers=kafka:9999",
- "--chat.backend.kafka.instance-uri=http://backend-ID:8080",
- "--chat.backend.kafka.num-partitions=10",
- "--chat.backend.kafka.client-id-prefix=BID",
- "--chat.backend.kafka.haproxy-data-plane-api=http://haproxy:5555/v2/",
- "--chat.backend.kafka.haproxy-user=juplo",
- "--chat.backend.kafka.haproxy-password=juplo",
- "--chat.backend.kafka.haproxy-map=sharding",
- "--logging.level.de.juplo=DEBUG"
- };
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend;
-
-import lombok.Getter;
-import lombok.Setter;
-
-
-
-@Getter
-@Setter
-class StatusTo
-{
- String status;
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
-import de.juplo.kafka.chat.backend.api.MessageTo;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.core.ParameterizedTypeReference;
-import org.springframework.http.MediaType;
-import org.springframework.http.codec.ServerSentEvent;
-import org.springframework.web.reactive.function.client.WebClient;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.util.retry.Retry;
-
-import java.time.Duration;
-import java.util.*;
-
-
-@Slf4j
-public class TestListener
-{
- static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
-
-
- public Flux<MessageTo> run()
- {
- return Flux
- .fromArray(chatRooms)
- .flatMap(chatRoom ->
- {
- List<MessageTo> list = new LinkedList<>();
- receivedMessages.put(chatRoom.getId(), list);
- return receiveMessages(chatRoom);
- });
- }
-
- Flux<MessageTo> receiveMessages(ChatRoomInfoTo chatRoom)
- {
- log.info("Requesting messages for chat-room {}", chatRoom);
- List<MessageTo> list = receivedMessages.get(chatRoom.getId());
- return receiveServerSentEvents(chatRoom)
- .flatMap(sse ->
- {
- try
- {
- return Mono.just(objectMapper.readValue(sse.data(), MessageTo.class));
- }
- catch (Exception e)
- {
- return Mono.error(e);
- }
- })
- .doOnNext(message -> list.add(message))
- .doOnComplete(() -> log.info("Listening to {} was completed!", chatRoom))
- .doOnError(throwalbe -> log.error("Listening to {} failed!", chatRoom, throwalbe))
- .thenMany(Flux.defer(() -> receiveMessages(chatRoom)));
- }
-
- Flux<ServerSentEvent<String>> receiveServerSentEvents(ChatRoomInfoTo chatRoom)
- {
- return webClient
- .get()
- .uri(
- "/{chatRoomId}/listen",
- chatRoom.getId())
- .accept(MediaType.TEXT_EVENT_STREAM)
- .header("X-Shard", chatRoom.getShard().toString())
- .retrieve()
- .bodyToFlux(SSE_TYPE)
- .retryWhen(Retry.fixedDelay(15, Duration.ofSeconds(1)));
- }
-
-
- private final WebClient webClient;
- private final ChatRoomInfoTo[] chatRooms;
- private final ObjectMapper objectMapper;
-
- final Map<UUID, List<MessageTo>> receivedMessages = new HashMap<>();
-
-
- TestListener(Integer port, ChatRoomInfoTo[] chatRooms)
- {
- webClient = WebClient.create("http://localhost:" + port);
- this.chatRooms = chatRooms;
- objectMapper = new ObjectMapper();
- objectMapper.registerModule(new JavaTimeModule());
- objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend;
-
-import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
-import de.juplo.kafka.chat.backend.api.MessageTo;
-import lombok.Getter;
-import lombok.ToString;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.MediaType;
-import org.springframework.web.reactive.function.client.WebClient;
-import org.springframework.web.reactive.function.client.WebClientResponseException;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.util.retry.Retry;
-
-import java.nio.charset.Charset;
-import java.time.Duration;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-
-
-@ToString(of = "user")
-@Slf4j
-public class TestWriter
-{
- public Mono<Void> run()
- {
- return Flux
- .fromIterable((Iterable<Integer>) () -> new Iterator<>()
- {
- private int i = 0;
-
- @Override
- public boolean hasNext()
- {
- return running;
- }
-
- @Override
- public Integer next()
- {
- return i++;
- }
- })
- .delayElements(Duration.ofMillis(ThreadLocalRandom.current().nextLong(500, 1500)))
- .map(i -> "Message #" + i)
- .concatMap(message -> sendMessage(chatRoom, message)
- .doOnError(throwable ->
- {
- WebClientResponseException e = (WebClientResponseException)throwable;
- log.info(
- "Could not sent message {} for {}: {}",
- message,
- user.getName(),
- e.getResponseBodyAsString());
- })
- .retryWhen(Retry.fixedDelay(60, Duration.ofSeconds(1))))
- .doOnNext(message ->
- {
- numSentMessages++;
- sentMessages.add(message);
- log.info(
- "{} sent a message to {}: {}",
- user,
- chatRoom,
- message);
- })
- .doOnError(throwable ->
- {
- WebClientResponseException e = (WebClientResponseException)throwable.getCause();
- log.error(
- "{} failed sending a message: {}",
- user,
- e.getResponseBodyAsString(Charset.defaultCharset()));
- })
- .takeUntil(message -> !running)
- .doOnComplete(() -> log.info("TestWriter {} is done", user))
- .then();
- }
-
- private Mono<MessageTo> sendMessage(
- ChatRoomInfoTo chatRoom,
- String message)
- {
- return webClient
- .put()
- .uri(
- "/{chatRoomId}/{username}/{serial}",
- chatRoom.getId(),
- user.getName(),
- user.nextSerial())
- .contentType(MediaType.TEXT_PLAIN)
- .accept(MediaType.APPLICATION_JSON)
- .header("X-Shard", chatRoom.getShard().toString())
- .bodyValue(message)
- .exchangeToMono(response ->
- {
- if (response.statusCode().equals(HttpStatus.OK))
- {
- return response.bodyToMono(MessageTo.class);
- }
- else
- {
- return response.createError();
- }
- });
- }
-
-
- private final WebClient webClient;
-
- final ChatRoomInfoTo chatRoom;
- final User user;
- final List<MessageTo> sentMessages = new LinkedList<>();
-
- volatile boolean running = true;
- @Getter
- private volatile int numSentMessages = 0;
-
-
- TestWriter(Integer port, ChatRoomInfoTo chatRoom, String username)
- {
- webClient = WebClient.create("http://localhost:" + port);
- this.chatRoom = chatRoom;
- user = new User(username);
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend;
-
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.ToString;
-
-
-@EqualsAndHashCode(of = "name")
-@ToString(of = "name")
-class User
-{
- @Getter
- private final String name;
- private int serial = 0;
-
-
- User (String name)
- {
- this.name = name;
- }
-
-
- int nextSerial()
- {
- return ++serial;
- }
-}
--- /dev/null
+package de.juplo.kafka.chat.backend;
+
+import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
+import de.juplo.kafka.chat.backend.api.MessageTo;
+import lombok.extern.slf4j.Slf4j;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.web.reactive.function.client.WebClient;
+import pl.rzrz.assertj.reactor.Assertions;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+
+@Slf4j
+public abstract class AbstractHandoverIT
+{
+ static final int NUM_CHATROOMS = 23;
+ static final int NUM_CLIENTS = 17;
+
+
+ private final AbstractHandoverITContainers containers;
+
+
+ AbstractHandoverIT(AbstractHandoverITContainers containers)
+ {
+ this.containers = containers;
+ }
+
+
+ @Test
+ void test() throws InterruptedException
+ {
+ log.info("Starting backend-1...");
+ containers.startBackend(containers.backend1, new TestWriter[0]);
+ log.info("backend-1 started!");
+
+ ChatRoomInfoTo[] chatRooms = Flux
+ .range(0, NUM_CHATROOMS)
+ .flatMap(i -> createChatRoom("room-" + i))
+ .toStream()
+ .toArray(size -> new ChatRoomInfoTo[size]);
+
+ int port = containers.haproxy.getMappedPort(8400);
+
+ CompletableFuture<Void>[] testWriterFutures = new CompletableFuture[NUM_CLIENTS];
+ TestWriter[] testWriters = new TestWriter[NUM_CLIENTS];
+ for (int i = 0; i < NUM_CLIENTS; i++)
+ {
+ TestWriter testWriter = new TestWriter(
+ port,
+ chatRooms[i % NUM_CHATROOMS],
+ "user-" + i);
+ testWriters[i] = testWriter;
+ testWriterFutures[i] = testWriter
+ .run()
+ .toFuture();
+ }
+
+ TestListener testListener = new TestListener(port, chatRooms);
+ testListener
+ .run()
+ .subscribe(message -> log.info(
+ "Received message: {}",
+ message));
+
+ log.info("Starting backend-2...");
+ containers.startBackend(containers.backend2, testWriters);
+ log.info("backend-2 started!");
+
+ log.info("Starting backend-3...");
+ containers.startBackend(containers.backend3, testWriters);
+ log.info("backend-3 started!");
+
+ for (int i = 0; i < NUM_CLIENTS; i++)
+ {
+ testWriters[i].running = false;
+ testWriterFutures[i].join();
+ log.info("Joined TestWriter {}", testWriters[i].user);
+ }
+
+ Awaitility
+ .await()
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> assertAllSentMessagesReceived(testWriters, testListener));
+ }
+
+ private void assertAllSentMessagesReceived(
+ TestWriter[] testWriters,
+ TestListener testListener)
+ {
+ for (int i = 0; i < NUM_CLIENTS; i++)
+ {
+ TestWriter testWriter = testWriters[i];
+ ChatRoomInfoTo chatRoom = testWriter.chatRoom;
+ List<MessageTo> receivedMessages = testListener.receivedMessages.get(chatRoom.getId());
+
+ Assertions.assertThat(receivedMessages
+ .stream()
+ .filter(message -> message.getUser().equals(testWriter.user.getName()))
+ ).containsExactlyElementsOf(testWriter.sentMessages);
+ }
+ }
+
+ Mono<ChatRoomInfoTo> createChatRoom(String name)
+ {
+ return webClient
+ .post()
+ .uri("/create")
+ .contentType(MediaType.TEXT_PLAIN)
+ .bodyValue(name)
+ .accept(MediaType.APPLICATION_JSON)
+ .exchangeToMono(response ->
+ {
+ if (response.statusCode().equals(HttpStatus.OK))
+ {
+ return response.bodyToMono(ChatRoomInfoTo.class);
+ }
+ else
+ {
+ return response.createError();
+ }
+ });
+ }
+
+
+ WebClient webClient;
+
+ @BeforeEach
+ void setUp() throws Exception
+ {
+ containers.setUp();
+
+ Integer port = containers.haproxy.getMappedPort(8400);
+ webClient = WebClient.create("http://localhost:" + port);
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend;
+
+import de.juplo.kafka.chat.backend.implementation.haproxy.MapEntryTo;
+import de.juplo.kafka.chat.backend.implementation.haproxy.MapInfoTo;
+import lombok.extern.slf4j.Slf4j;
+import org.awaitility.Awaitility;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.HttpStatusCode;
+import org.springframework.http.MediaType;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.images.ImagePullPolicy;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.MountableFile;
+import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+
+import static de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyDataPlaneApiShardingPublisherStrategy.MAP_PARAM;
+
+
+@Slf4j
+public abstract class AbstractHandoverITContainers
+{
+ static final ImagePullPolicy NEVER_PULL = imageName -> false;
+
+
+ final Network network = Network.newNetwork();
+ final GenericContainer haproxy, backend1, backend2, backend3;
+
+
+ AbstractHandoverITContainers()
+ {
+ haproxy = createHaproxyContainer();
+ haproxy.start();
+
+ backend1 = createBackendContainer("1");
+ backend2 = createBackendContainer("2");
+ backend3 = createBackendContainer("3");
+ }
+
+
+ void setUpExtra() throws Exception
+ {
+ log.info("This setup does not need any extra containers");
+ }
+
+ void setUp() throws Exception
+ {
+ setUpExtra();
+ }
+
+ void startBackend(
+ GenericContainer backend,
+ TestWriter[] testWriters)
+ {
+ backend.start();
+
+ int[] numSentMessages = Arrays
+ .stream(testWriters)
+ .mapToInt(testWriter -> testWriter.getNumSentMessages())
+ .toArray();
+
+ String backendUri = "http://localhost:" + backend.getMappedPort(8080);
+
+ Instant before, after;
+
+ before = Instant.now();
+ HttpStatusCode statusCode = WebClient
+ .create(backendUri)
+ .get()
+ .uri("/actuator/health")
+ .exchangeToMono(response ->
+ {
+ log.info("{} responded with {}", backendUri, response.statusCode());
+ return Mono.just(response.statusCode());
+ })
+ .flatMap(status -> switch (status.value())
+ {
+ case 200, 503 -> Mono.just(status);
+ default -> Mono.error(new RuntimeException(status.toString()));
+ })
+ .retryWhen(Retry.backoff(30, Duration.ofSeconds(1)))
+ .block();
+ after = Instant.now();
+ log.info("Took {} to reach status {}", Duration.between(before, after), statusCode);
+
+ before = Instant.now();
+ Awaitility
+ .await()
+ .atMost(Duration.ofSeconds(45))
+ .until(() -> WebClient
+ .create(backendUri)
+ .get()
+ .uri("/actuator/health")
+ .exchangeToMono(response ->
+ {
+ log.info("{} responded with {}", backendUri, response.statusCode());
+ if (response.statusCode().equals(HttpStatus.OK))
+ {
+ return response
+ .bodyToMono(StatusTo.class)
+ .map(StatusTo::getStatus)
+ .map(status -> status.equalsIgnoreCase("UP"));
+ }
+ else
+ {
+ return Mono.just(false);
+ }
+ })
+ .block());
+ after = Instant.now();
+ log.info("Took {} until the backend reported status UP", Duration.between(before, after));
+
+ haproxy
+ .getDockerClient()
+ .killContainerCmd(haproxy.getContainerId())
+ .withSignal("HUP")
+ .exec();
+
+ before = Instant.now();
+ Awaitility
+ .await()
+ .atMost(Duration.ofSeconds(15))
+ .until(() -> WebClient
+ .create("http://localhost:" + haproxy.getMappedPort(8400))
+ .get()
+ .uri("/actuator/health")
+ .exchangeToMono(response ->
+ {
+ log.info("{} responded with {}", backendUri, response.statusCode());
+ if (response.statusCode().equals(HttpStatus.OK))
+ {
+ return response
+ .bodyToMono(StatusTo.class)
+ .map(StatusTo::getStatus)
+ .map(status -> status.equalsIgnoreCase("UP"));
+ }
+ else
+ {
+ return Mono.just(false);
+ }
+ })
+ .block());
+ after = Instant.now();
+ log.info("Took {} until haproxy reported status UP", Duration.between(before, after));
+
+ before = Instant.now();
+ Awaitility
+ .await()
+ .atMost(Duration.ofSeconds(30))
+ .until(() ->
+ {
+ for (int i = 0; i < testWriters.length; i++)
+ {
+ TestWriter testWriter = testWriters[i];
+ int sentTotal = testWriter.getNumSentMessages();
+ if (sentTotal == numSentMessages[i])
+ {
+ log.info(
+ "No progress for {}: sent-before={}, sent-total={}, map: {}",
+ testWriter,
+ numSentMessages[i],
+ sentTotal,
+ readHaproxyMap());
+ return false;
+ }
+ }
+
+ return true;
+ });
+ after = Instant.now();
+ log.info("Took {} until all writers made some progress", Duration.between(before, after));
+ }
+
+ abstract String[] getBackendCommand();
+
+ final GenericContainer createHaproxyContainer()
+ {
+ return new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8"))
+ .withCommand("-f", "/etc/haproxy")
+ .withNetwork(network)
+ .withNetworkAliases("haproxy")
+ .withCopyFileToContainer(
+ MountableFile.forClasspathResource("haproxy.cfg"),
+ "/etc/haproxy/haproxy.cfg")
+ .withCopyFileToContainer(
+ MountableFile.forClasspathResource("dataplaneapi.yml"),
+ "/etc/haproxy/dataplaneapi.yml")
+ .withCopyFileToContainer(
+ MountableFile.forClasspathResource("sharding.map"),
+ "/etc/haproxy/maps/sharding.map")
+ .withExposedPorts(8400, 8401, 8404, 5555)
+ .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY"));
+ }
+
+ final GenericContainer createBackendContainer(String id)
+ {
+ return new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
+ .withImagePullPolicy(NEVER_PULL)
+ .withNetwork(network)
+ .withNetworkAliases("backend-ID".replaceAll("ID", id))
+ .withCommand(Arrays.stream(getBackendCommand())
+ .map(commandPart -> commandPart.replaceAll("ID", id))
+ .toArray(size -> new String[size]))
+ .withExposedPorts(8080)
+ .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
+ .withLogConsumer(new Slf4jLogConsumer(
+ log,
+ true
+ )
+ .withPrefix("BACKEND-ID".replaceAll("ID", id)));
+ }
+
+ private String readHaproxyMap()
+ {
+ return createHaproxyWebClient()
+ .get()
+ .uri(uriBuilder -> uriBuilder
+ .path("/services/haproxy/runtime/maps_entries")
+ .queryParam(MAP_PARAM, MAP_NAME)
+ .build())
+ .accept(MediaType.APPLICATION_JSON)
+ .exchangeToFlux(response ->
+ {
+ if (response.statusCode().equals(HttpStatus.OK))
+ {
+ return response.bodyToFlux(MapEntryTo.class);
+ }
+ else
+ {
+ return response.<MapEntryTo>createError().flux();
+ }
+ })
+ .map(entry -> entry.key() + "=" + entry.value())
+ .reduce((a, b) -> a + ", " + b)
+ .block();
+ }
+
+ private WebClient createHaproxyWebClient()
+ {
+ return WebClient
+ .builder()
+ .baseUrl("http://localhost:" + haproxy.getMappedPort(5555) + "/v2/")
+ .defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth("juplo", "juplo"))
+ .build();
+ }
+
+ static final String MAP_NAME = "sharding";
+}
--- /dev/null
+package de.juplo.kafka.chat.backend;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+class KafkaHandoverIT extends AbstractHandoverIT
+{
+ KafkaHandoverIT()
+ {
+ super(new KafkaHandoverITContainers());
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend;
+
+import lombok.extern.slf4j.Slf4j;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+
+
+@Slf4j
+class KafkaHandoverITContainers extends AbstractHandoverITContainers
+{
+ @Override
+ void setUpExtra() throws IOException, InterruptedException
+ {
+ kafka.start();
+
+ Container.ExecResult result;
+ result = kafka.execInContainer(
+ "kafka-topics",
+ "--bootstrap-server",
+ "kafka:9999",
+ "--create",
+ "--topic",
+ "info_channel",
+ "--partitions",
+ "3");
+ log.info(
+ "EXIT-CODE={}, STDOUT={}, STDERR={}",
+ result.getExitCode(),
+ result.getStdout(),
+ result.getStdout());
+ result = kafka.execInContainer(
+ "kafka-topics",
+ "--bootstrap-server",
+ "kafka:9999",
+ "--create",
+ "--topic",
+ "data_channel",
+ "--partitions",
+ "10");
+ log.info(
+ "EXIT-CODE={}, STDOUT={}, STDERR={}",
+ result.getExitCode(),
+ result.getStdout(),
+ result.getStdout());
+ }
+
+
+ private final KafkaContainer kafka =
+ new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
+ .withNetwork(network)
+ .withNetworkAliases("kafka")
+ .withListener(() -> "kafka:9999")
+ .withKraft()
+ .waitingFor(Wait.forLogMessage(".*Kafka\\ Server\\ started.*\\n", 1))
+ .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("KAFKA"));
+
+
+ @Override
+ String[] getBackendCommand()
+ {
+ return new String[]
+ {
+ "--chat.backend.instance-id=backend_ID",
+ "--chat.backend.services=kafka",
+ "--chat.backend.kafka.bootstrap-servers=kafka:9999",
+ "--chat.backend.kafka.instance-uri=http://backend-ID:8080",
+ "--chat.backend.kafka.num-partitions=10",
+ "--chat.backend.kafka.client-id-prefix=BID",
+ "--chat.backend.kafka.haproxy-data-plane-api=http://haproxy:5555/v2/",
+ "--chat.backend.kafka.haproxy-user=juplo",
+ "--chat.backend.kafka.haproxy-password=juplo",
+ "--chat.backend.kafka.haproxy-map=sharding",
+ "--logging.level.de.juplo=DEBUG"
+ };
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend;
+
+import lombok.Getter;
+import lombok.Setter;
+
+
+
+@Getter
+@Setter
+class StatusTo
+{
+ String status;
+}
--- /dev/null
+package de.juplo.kafka.chat.backend;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
+import de.juplo.kafka.chat.backend.api.MessageTo;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.http.MediaType;
+import org.springframework.http.codec.ServerSentEvent;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+
+import java.time.Duration;
+import java.util.*;
+
+
+@Slf4j
+public class TestListener
+{
+ static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
+
+
+ public Flux<MessageTo> run()
+ {
+ return Flux
+ .fromArray(chatRooms)
+ .flatMap(chatRoom ->
+ {
+ List<MessageTo> list = new LinkedList<>();
+ receivedMessages.put(chatRoom.getId(), list);
+ return receiveMessages(chatRoom);
+ });
+ }
+
+ Flux<MessageTo> receiveMessages(ChatRoomInfoTo chatRoom)
+ {
+ log.info("Requesting messages for chat-room {}", chatRoom);
+ List<MessageTo> list = receivedMessages.get(chatRoom.getId());
+ return receiveServerSentEvents(chatRoom)
+ .flatMap(sse ->
+ {
+ try
+ {
+ return Mono.just(objectMapper.readValue(sse.data(), MessageTo.class));
+ }
+ catch (Exception e)
+ {
+ return Mono.error(e);
+ }
+ })
+ .doOnNext(message -> list.add(message))
+ .doOnComplete(() -> log.info("Listening to {} was completed!", chatRoom))
+ .doOnError(throwalbe -> log.error("Listening to {} failed!", chatRoom, throwalbe))
+ .thenMany(Flux.defer(() -> receiveMessages(chatRoom)));
+ }
+
+ Flux<ServerSentEvent<String>> receiveServerSentEvents(ChatRoomInfoTo chatRoom)
+ {
+ return webClient
+ .get()
+ .uri(
+ "/{chatRoomId}/listen",
+ chatRoom.getId())
+ .accept(MediaType.TEXT_EVENT_STREAM)
+ .header("X-Shard", chatRoom.getShard().toString())
+ .retrieve()
+ .bodyToFlux(SSE_TYPE)
+ .retryWhen(Retry.fixedDelay(15, Duration.ofSeconds(1)));
+ }
+
+
+ private final WebClient webClient;
+ private final ChatRoomInfoTo[] chatRooms;
+ private final ObjectMapper objectMapper;
+
+ final Map<UUID, List<MessageTo>> receivedMessages = new HashMap<>();
+
+
+ TestListener(Integer port, ChatRoomInfoTo[] chatRooms)
+ {
+ webClient = WebClient.create("http://localhost:" + port);
+ this.chatRooms = chatRooms;
+ objectMapper = new ObjectMapper();
+ objectMapper.registerModule(new JavaTimeModule());
+ objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend;
+
+import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
+import de.juplo.kafka.chat.backend.api.MessageTo;
+import lombok.Getter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+
+import java.nio.charset.Charset;
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+
+@ToString(of = "user")
+@Slf4j
+public class TestWriter
+{
+ public Mono<Void> run()
+ {
+ return Flux
+ .fromIterable((Iterable<Integer>) () -> new Iterator<>()
+ {
+ private int i = 0;
+
+ @Override
+ public boolean hasNext()
+ {
+ return running;
+ }
+
+ @Override
+ public Integer next()
+ {
+ return i++;
+ }
+ })
+ .delayElements(Duration.ofMillis(ThreadLocalRandom.current().nextLong(500, 1500)))
+ .map(i -> "Message #" + i)
+ .concatMap(message -> sendMessage(chatRoom, message)
+ .doOnError(throwable ->
+ {
+ WebClientResponseException e = (WebClientResponseException)throwable;
+ log.info(
+ "Could not sent message {} for {}: {}",
+ message,
+ user.getName(),
+ e.getResponseBodyAsString());
+ })
+ .retryWhen(Retry.fixedDelay(60, Duration.ofSeconds(1))))
+ .doOnNext(message ->
+ {
+ numSentMessages++;
+ sentMessages.add(message);
+ log.info(
+ "{} sent a message to {}: {}",
+ user,
+ chatRoom,
+ message);
+ })
+ .doOnError(throwable ->
+ {
+ WebClientResponseException e = (WebClientResponseException)throwable.getCause();
+ log.error(
+ "{} failed sending a message: {}",
+ user,
+ e.getResponseBodyAsString(Charset.defaultCharset()));
+ })
+ .takeUntil(message -> !running)
+ .doOnComplete(() -> log.info("TestWriter {} is done", user))
+ .then();
+ }
+
+ private Mono<MessageTo> sendMessage(
+ ChatRoomInfoTo chatRoom,
+ String message)
+ {
+ return webClient
+ .put()
+ .uri(
+ "/{chatRoomId}/{username}/{serial}",
+ chatRoom.getId(),
+ user.getName(),
+ user.nextSerial())
+ .contentType(MediaType.TEXT_PLAIN)
+ .accept(MediaType.APPLICATION_JSON)
+ .header("X-Shard", chatRoom.getShard().toString())
+ .bodyValue(message)
+ .exchangeToMono(response ->
+ {
+ if (response.statusCode().equals(HttpStatus.OK))
+ {
+ return response.bodyToMono(MessageTo.class);
+ }
+ else
+ {
+ return response.createError();
+ }
+ });
+ }
+
+
+ private final WebClient webClient;
+
+ final ChatRoomInfoTo chatRoom;
+ final User user;
+ final List<MessageTo> sentMessages = new LinkedList<>();
+
+ volatile boolean running = true;
+ @Getter
+ private volatile int numSentMessages = 0;
+
+
+ TestWriter(Integer port, ChatRoomInfoTo chatRoom, String username)
+ {
+ webClient = WebClient.create("http://localhost:" + port);
+ this.chatRoom = chatRoom;
+ user = new User(username);
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+
+
+@EqualsAndHashCode(of = "name")
+@ToString(of = "name")
+class User
+{
+ @Getter
+ private final String name;
+ private int serial = 0;
+
+
+ User (String name)
+ {
+ this.name = name;
+ }
+
+
+ int nextSerial()
+ {
+ return ++serial;
+ }
+}