]> juplo.de Git - demos/kafka/chat/commitdiff
refactor: Moved the ``HandoverIT``-tests into the ``api``-package -- MOVE
authorKai Moritz <kai@juplo.de>
Mon, 25 Mar 2024 21:07:34 +0000 (22:07 +0100)
committerKai Moritz <kai@juplo.de>
Mon, 25 Mar 2024 22:23:07 +0000 (23:23 +0100)
16 files changed:
src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverITContainers.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverITContainers.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/StatusTo.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/TestListener.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/TestWriter.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/User.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/api/AbstractHandoverIT.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/api/AbstractHandoverITContainers.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/api/KafkaHandoverIT.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/api/KafkaHandoverITContainers.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/api/StatusTo.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/api/TestListener.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/api/TestWriter.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/api/User.java [new file with mode: 0644]

diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java
deleted file mode 100644 (file)
index e92155d..0000000
+++ /dev/null
@@ -1,143 +0,0 @@
-package de.juplo.kafka.chat.backend;
-
-import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
-import de.juplo.kafka.chat.backend.api.MessageTo;
-import lombok.extern.slf4j.Slf4j;
-import org.awaitility.Awaitility;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.MediaType;
-import org.springframework.web.reactive.function.client.WebClient;
-import pl.rzrz.assertj.reactor.Assertions;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.time.Duration;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-
-@Slf4j
-public abstract class AbstractHandoverIT
-{
-  static final int NUM_CHATROOMS = 23;
-  static final int NUM_CLIENTS = 17;
-
-
-  private final AbstractHandoverITContainers containers;
-
-
-  AbstractHandoverIT(AbstractHandoverITContainers containers)
-  {
-    this.containers = containers;
-  }
-
-
-  @Test
-  void test() throws InterruptedException
-  {
-    log.info("Starting backend-1...");
-    containers.startBackend(containers.backend1, new TestWriter[0]);
-    log.info("backend-1 started!");
-
-    ChatRoomInfoTo[] chatRooms = Flux
-        .range(0, NUM_CHATROOMS)
-        .flatMap(i -> createChatRoom("room-" + i))
-        .toStream()
-        .toArray(size -> new ChatRoomInfoTo[size]);
-
-    int port = containers.haproxy.getMappedPort(8400);
-
-    CompletableFuture<Void>[] testWriterFutures = new CompletableFuture[NUM_CLIENTS];
-    TestWriter[] testWriters = new TestWriter[NUM_CLIENTS];
-    for (int i = 0; i < NUM_CLIENTS; i++)
-    {
-      TestWriter testWriter = new TestWriter(
-          port,
-          chatRooms[i % NUM_CHATROOMS],
-          "user-" + i);
-      testWriters[i] = testWriter;
-      testWriterFutures[i] = testWriter
-          .run()
-          .toFuture();
-    }
-
-    TestListener testListener = new TestListener(port, chatRooms);
-    testListener
-        .run()
-        .subscribe(message -> log.info(
-            "Received message: {}",
-            message));
-
-    log.info("Starting backend-2...");
-    containers.startBackend(containers.backend2, testWriters);
-    log.info("backend-2 started!");
-
-    log.info("Starting backend-3...");
-    containers.startBackend(containers.backend3, testWriters);
-    log.info("backend-3 started!");
-
-    for (int i = 0; i < NUM_CLIENTS; i++)
-    {
-      testWriters[i].running = false;
-      testWriterFutures[i].join();
-      log.info("Joined TestWriter {}", testWriters[i].user);
-    }
-
-    Awaitility
-        .await()
-        .atMost(Duration.ofSeconds(5))
-        .untilAsserted(() -> assertAllSentMessagesReceived(testWriters, testListener));
-  }
-
-  private void assertAllSentMessagesReceived(
-      TestWriter[] testWriters,
-      TestListener testListener)
-  {
-    for (int i = 0; i < NUM_CLIENTS; i++)
-    {
-      TestWriter testWriter = testWriters[i];
-      ChatRoomInfoTo chatRoom = testWriter.chatRoom;
-      List<MessageTo> receivedMessages = testListener.receivedMessages.get(chatRoom.getId());
-
-      Assertions.assertThat(receivedMessages
-          .stream()
-          .filter(message -> message.getUser().equals(testWriter.user.getName()))
-          ).containsExactlyElementsOf(testWriter.sentMessages);
-    }
-  }
-
-  Mono<ChatRoomInfoTo> createChatRoom(String name)
-  {
-    return webClient
-        .post()
-        .uri("/create")
-        .contentType(MediaType.TEXT_PLAIN)
-        .bodyValue(name)
-        .accept(MediaType.APPLICATION_JSON)
-        .exchangeToMono(response ->
-        {
-          if (response.statusCode().equals(HttpStatus.OK))
-          {
-            return response.bodyToMono(ChatRoomInfoTo.class);
-          }
-          else
-          {
-            return response.createError();
-          }
-        });
-  }
-
-
-  WebClient webClient;
-
-  @BeforeEach
-  void setUp() throws Exception
-  {
-    containers.setUp();
-
-    Integer port = containers.haproxy.getMappedPort(8400);
-    webClient = WebClient.create("http://localhost:" + port);
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverITContainers.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverITContainers.java
deleted file mode 100644 (file)
index 0b245a9..0000000
+++ /dev/null
@@ -1,256 +0,0 @@
-package de.juplo.kafka.chat.backend;
-
-import de.juplo.kafka.chat.backend.implementation.haproxy.MapEntryTo;
-import de.juplo.kafka.chat.backend.implementation.haproxy.MapInfoTo;
-import lombok.extern.slf4j.Slf4j;
-import org.awaitility.Awaitility;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.HttpStatusCode;
-import org.springframework.http.MediaType;
-import org.springframework.web.reactive.function.client.WebClient;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.Network;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.containers.wait.strategy.Wait;
-import org.testcontainers.images.ImagePullPolicy;
-import org.testcontainers.utility.DockerImageName;
-import org.testcontainers.utility.MountableFile;
-import reactor.core.publisher.Mono;
-import reactor.util.retry.Retry;
-
-import java.time.Duration;
-import java.time.Instant;
-import java.util.Arrays;
-
-import static de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyDataPlaneApiShardingPublisherStrategy.MAP_PARAM;
-
-
-@Slf4j
-public abstract class AbstractHandoverITContainers
-{
-  static final ImagePullPolicy NEVER_PULL = imageName -> false;
-
-
-  final Network network = Network.newNetwork();
-  final GenericContainer haproxy, backend1, backend2, backend3;
-
-
-  AbstractHandoverITContainers()
-  {
-    haproxy = createHaproxyContainer();
-    haproxy.start();
-
-    backend1 = createBackendContainer("1");
-    backend2 = createBackendContainer("2");
-    backend3 = createBackendContainer("3");
-  }
-
-
-  void setUpExtra() throws Exception
-  {
-    log.info("This setup does not need any extra containers");
-  }
-
-  void setUp() throws Exception
-  {
-    setUpExtra();
-  }
-
-  void startBackend(
-      GenericContainer backend,
-      TestWriter[] testWriters)
-  {
-    backend.start();
-
-    int[] numSentMessages = Arrays
-        .stream(testWriters)
-        .mapToInt(testWriter -> testWriter.getNumSentMessages())
-        .toArray();
-
-    String backendUri = "http://localhost:" + backend.getMappedPort(8080);
-
-    Instant before, after;
-
-    before = Instant.now();
-    HttpStatusCode statusCode = WebClient
-        .create(backendUri)
-        .get()
-        .uri("/actuator/health")
-        .exchangeToMono(response ->
-        {
-          log.info("{} responded with {}", backendUri, response.statusCode());
-          return Mono.just(response.statusCode());
-        })
-        .flatMap(status -> switch (status.value())
-        {
-          case 200, 503 -> Mono.just(status);
-          default -> Mono.error(new RuntimeException(status.toString()));
-        })
-        .retryWhen(Retry.backoff(30, Duration.ofSeconds(1)))
-        .block();
-    after = Instant.now();
-    log.info("Took {} to reach status {}", Duration.between(before, after), statusCode);
-
-    before = Instant.now();
-    Awaitility
-        .await()
-        .atMost(Duration.ofSeconds(45))
-        .until(() -> WebClient
-            .create(backendUri)
-            .get()
-            .uri("/actuator/health")
-            .exchangeToMono(response ->
-            {
-              log.info("{} responded with {}", backendUri, response.statusCode());
-              if (response.statusCode().equals(HttpStatus.OK))
-              {
-                return response
-                    .bodyToMono(StatusTo.class)
-                    .map(StatusTo::getStatus)
-                    .map(status -> status.equalsIgnoreCase("UP"));
-              }
-              else
-              {
-                return Mono.just(false);
-              }
-            })
-            .block());
-    after = Instant.now();
-    log.info("Took {} until the backend reported status UP", Duration.between(before, after));
-
-    haproxy
-        .getDockerClient()
-        .killContainerCmd(haproxy.getContainerId())
-        .withSignal("HUP")
-        .exec();
-
-    before = Instant.now();
-    Awaitility
-        .await()
-        .atMost(Duration.ofSeconds(15))
-        .until(() -> WebClient
-            .create("http://localhost:" + haproxy.getMappedPort(8400))
-            .get()
-            .uri("/actuator/health")
-            .exchangeToMono(response ->
-            {
-              log.info("{} responded with {}", backendUri, response.statusCode());
-              if (response.statusCode().equals(HttpStatus.OK))
-              {
-                return response
-                    .bodyToMono(StatusTo.class)
-                    .map(StatusTo::getStatus)
-                    .map(status -> status.equalsIgnoreCase("UP"));
-              }
-              else
-              {
-                return Mono.just(false);
-              }
-            })
-            .block());
-    after = Instant.now();
-    log.info("Took {} until haproxy reported status UP", Duration.between(before, after));
-
-    before = Instant.now();
-    Awaitility
-        .await()
-        .atMost(Duration.ofSeconds(30))
-        .until(() ->
-        {
-          for (int i = 0; i < testWriters.length; i++)
-          {
-            TestWriter testWriter = testWriters[i];
-            int sentTotal = testWriter.getNumSentMessages();
-            if (sentTotal == numSentMessages[i])
-            {
-              log.info(
-                  "No progress for {}: sent-before={}, sent-total={}, map: {}",
-                  testWriter,
-                  numSentMessages[i],
-                  sentTotal,
-                  readHaproxyMap());
-              return false;
-            }
-          }
-
-          return true;
-        });
-    after = Instant.now();
-    log.info("Took {} until all writers made some progress", Duration.between(before, after));
-  }
-
-  abstract String[] getBackendCommand();
-
-  final GenericContainer createHaproxyContainer()
-  {
-    return new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8"))
-        .withCommand("-f", "/etc/haproxy")
-        .withNetwork(network)
-        .withNetworkAliases("haproxy")
-        .withCopyFileToContainer(
-            MountableFile.forClasspathResource("haproxy.cfg"),
-            "/etc/haproxy/haproxy.cfg")
-        .withCopyFileToContainer(
-            MountableFile.forClasspathResource("dataplaneapi.yml"),
-            "/etc/haproxy/dataplaneapi.yml")
-        .withCopyFileToContainer(
-            MountableFile.forClasspathResource("sharding.map"),
-            "/etc/haproxy/maps/sharding.map")
-        .withExposedPorts(8400, 8401, 8404, 5555)
-        .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY"));
-  }
-
-  final GenericContainer createBackendContainer(String id)
-  {
-    return new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
-      .withImagePullPolicy(NEVER_PULL)
-      .withNetwork(network)
-      .withNetworkAliases("backend-ID".replaceAll("ID", id))
-      .withCommand(Arrays.stream(getBackendCommand())
-          .map(commandPart -> commandPart.replaceAll("ID", id))
-          .toArray(size -> new String[size]))
-      .withExposedPorts(8080)
-      .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
-      .withLogConsumer(new Slf4jLogConsumer(
-          log,
-          true
-          )
-          .withPrefix("BACKEND-ID".replaceAll("ID", id)));
-  }
-
-  private String readHaproxyMap()
-  {
-    return createHaproxyWebClient()
-        .get()
-        .uri(uriBuilder -> uriBuilder
-            .path("/services/haproxy/runtime/maps_entries")
-            .queryParam(MAP_PARAM, MAP_NAME)
-            .build())
-        .accept(MediaType.APPLICATION_JSON)
-        .exchangeToFlux(response ->
-        {
-          if (response.statusCode().equals(HttpStatus.OK))
-          {
-            return response.bodyToFlux(MapEntryTo.class);
-          }
-          else
-          {
-            return response.<MapEntryTo>createError().flux();
-          }
-        })
-        .map(entry -> entry.key() + "=" + entry.value())
-        .reduce((a, b) -> a + ", " + b)
-        .block();
-  }
-
-  private WebClient createHaproxyWebClient()
-  {
-    return WebClient
-        .builder()
-        .baseUrl("http://localhost:" + haproxy.getMappedPort(5555) + "/v2/")
-        .defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth("juplo", "juplo"))
-        .build();
-  }
-
-  static final String MAP_NAME = "sharding";
-}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverIT.java
deleted file mode 100644 (file)
index 0a6c75d..0000000
+++ /dev/null
@@ -1,13 +0,0 @@
-package de.juplo.kafka.chat.backend;
-
-import lombok.extern.slf4j.Slf4j;
-
-
-@Slf4j
-class KafkaHandoverIT extends AbstractHandoverIT
-{
-  KafkaHandoverIT()
-  {
-    super(new KafkaHandoverITContainers());
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverITContainers.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaHandoverITContainers.java
deleted file mode 100644 (file)
index 9a88012..0000000
+++ /dev/null
@@ -1,81 +0,0 @@
-package de.juplo.kafka.chat.backend;
-
-import lombok.extern.slf4j.Slf4j;
-import org.testcontainers.containers.Container;
-import org.testcontainers.containers.KafkaContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.containers.wait.strategy.Wait;
-import org.testcontainers.utility.DockerImageName;
-
-import java.io.IOException;
-
-
-@Slf4j
-class KafkaHandoverITContainers extends AbstractHandoverITContainers
-{
-  @Override
-  void setUpExtra() throws IOException, InterruptedException
-  {
-    kafka.start();
-
-    Container.ExecResult result;
-    result = kafka.execInContainer(
-        "kafka-topics",
-        "--bootstrap-server",
-        "kafka:9999",
-        "--create",
-        "--topic",
-        "info_channel",
-        "--partitions",
-        "3");
-    log.info(
-        "EXIT-CODE={}, STDOUT={}, STDERR={}",
-        result.getExitCode(),
-        result.getStdout(),
-        result.getStdout());
-    result = kafka.execInContainer(
-        "kafka-topics",
-        "--bootstrap-server",
-        "kafka:9999",
-        "--create",
-        "--topic",
-        "data_channel",
-        "--partitions",
-        "10");
-    log.info(
-        "EXIT-CODE={}, STDOUT={}, STDERR={}",
-        result.getExitCode(),
-        result.getStdout(),
-        result.getStdout());
-  }
-
-
-  private final KafkaContainer kafka =
-      new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
-          .withNetwork(network)
-          .withNetworkAliases("kafka")
-          .withListener(() -> "kafka:9999")
-          .withKraft()
-          .waitingFor(Wait.forLogMessage(".*Kafka\\ Server\\ started.*\\n", 1))
-          .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("KAFKA"));
-
-
-  @Override
-  String[] getBackendCommand()
-  {
-    return new String[]
-    {
-        "--chat.backend.instance-id=backend_ID",
-        "--chat.backend.services=kafka",
-        "--chat.backend.kafka.bootstrap-servers=kafka:9999",
-        "--chat.backend.kafka.instance-uri=http://backend-ID:8080",
-        "--chat.backend.kafka.num-partitions=10",
-        "--chat.backend.kafka.client-id-prefix=BID",
-        "--chat.backend.kafka.haproxy-data-plane-api=http://haproxy:5555/v2/",
-        "--chat.backend.kafka.haproxy-user=juplo",
-        "--chat.backend.kafka.haproxy-password=juplo",
-        "--chat.backend.kafka.haproxy-map=sharding",
-        "--logging.level.de.juplo=DEBUG"
-    };
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/StatusTo.java b/src/test/java/de/juplo/kafka/chat/backend/StatusTo.java
deleted file mode 100644 (file)
index f620942..0000000
+++ /dev/null
@@ -1,13 +0,0 @@
-package de.juplo.kafka.chat.backend;
-
-import lombok.Getter;
-import lombok.Setter;
-
-
-
-@Getter
-@Setter
-class StatusTo
-{
-  String status;
-}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/TestListener.java b/src/test/java/de/juplo/kafka/chat/backend/TestListener.java
deleted file mode 100644 (file)
index 092cd43..0000000
+++ /dev/null
@@ -1,91 +0,0 @@
-package de.juplo.kafka.chat.backend;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
-import de.juplo.kafka.chat.backend.api.MessageTo;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.core.ParameterizedTypeReference;
-import org.springframework.http.MediaType;
-import org.springframework.http.codec.ServerSentEvent;
-import org.springframework.web.reactive.function.client.WebClient;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.util.retry.Retry;
-
-import java.time.Duration;
-import java.util.*;
-
-
-@Slf4j
-public class TestListener
-{
-  static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
-
-
-  public Flux<MessageTo> run()
-  {
-    return Flux
-        .fromArray(chatRooms)
-        .flatMap(chatRoom ->
-        {
-          List<MessageTo> list = new LinkedList<>();
-          receivedMessages.put(chatRoom.getId(), list);
-          return receiveMessages(chatRoom);
-        });
-  }
-
-  Flux<MessageTo> receiveMessages(ChatRoomInfoTo chatRoom)
-  {
-    log.info("Requesting messages for chat-room {}", chatRoom);
-    List<MessageTo> list = receivedMessages.get(chatRoom.getId());
-    return receiveServerSentEvents(chatRoom)
-        .flatMap(sse ->
-        {
-          try
-          {
-            return Mono.just(objectMapper.readValue(sse.data(), MessageTo.class));
-          }
-          catch (Exception e)
-          {
-            return Mono.error(e);
-          }
-        })
-        .doOnNext(message -> list.add(message))
-        .doOnComplete(() -> log.info("Listening to {} was completed!", chatRoom))
-        .doOnError(throwalbe -> log.error("Listening to {} failed!", chatRoom, throwalbe))
-        .thenMany(Flux.defer(() -> receiveMessages(chatRoom)));
-  }
-
-  Flux<ServerSentEvent<String>> receiveServerSentEvents(ChatRoomInfoTo chatRoom)
-  {
-    return webClient
-        .get()
-        .uri(
-            "/{chatRoomId}/listen",
-            chatRoom.getId())
-        .accept(MediaType.TEXT_EVENT_STREAM)
-        .header("X-Shard", chatRoom.getShard().toString())
-        .retrieve()
-        .bodyToFlux(SSE_TYPE)
-        .retryWhen(Retry.fixedDelay(15, Duration.ofSeconds(1)));
-  }
-
-
-  private final WebClient webClient;
-  private final ChatRoomInfoTo[] chatRooms;
-  private final ObjectMapper objectMapper;
-
-  final Map<UUID, List<MessageTo>> receivedMessages = new HashMap<>();
-
-
-  TestListener(Integer port, ChatRoomInfoTo[] chatRooms)
-  {
-    webClient = WebClient.create("http://localhost:" + port);
-    this.chatRooms = chatRooms;
-    objectMapper = new ObjectMapper();
-    objectMapper.registerModule(new JavaTimeModule());
-    objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/TestWriter.java b/src/test/java/de/juplo/kafka/chat/backend/TestWriter.java
deleted file mode 100644 (file)
index 4220d00..0000000
+++ /dev/null
@@ -1,129 +0,0 @@
-package de.juplo.kafka.chat.backend;
-
-import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
-import de.juplo.kafka.chat.backend.api.MessageTo;
-import lombok.Getter;
-import lombok.ToString;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.MediaType;
-import org.springframework.web.reactive.function.client.WebClient;
-import org.springframework.web.reactive.function.client.WebClientResponseException;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.util.retry.Retry;
-
-import java.nio.charset.Charset;
-import java.time.Duration;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-
-
-@ToString(of = "user")
-@Slf4j
-public class TestWriter
-{
-  public Mono<Void> run()
-  {
-    return Flux
-        .fromIterable((Iterable<Integer>) () -> new Iterator<>()
-        {
-          private int i = 0;
-
-          @Override
-          public boolean hasNext()
-          {
-            return running;
-          }
-
-          @Override
-          public Integer next()
-          {
-            return i++;
-          }
-        })
-        .delayElements(Duration.ofMillis(ThreadLocalRandom.current().nextLong(500, 1500)))
-        .map(i -> "Message #" + i)
-        .concatMap(message -> sendMessage(chatRoom, message)
-            .doOnError(throwable ->
-            {
-              WebClientResponseException e = (WebClientResponseException)throwable;
-              log.info(
-                "Could not sent message {} for {}: {}",
-                message,
-                user.getName(),
-                e.getResponseBodyAsString());
-            })
-            .retryWhen(Retry.fixedDelay(60, Duration.ofSeconds(1))))
-        .doOnNext(message ->
-        {
-          numSentMessages++;
-          sentMessages.add(message);
-          log.info(
-              "{} sent a message to {}: {}",
-             user,
-             chatRoom,
-             message);
-        })
-        .doOnError(throwable ->
-        {
-          WebClientResponseException e = (WebClientResponseException)throwable.getCause();
-          log.error(
-              "{} failed sending a message: {}",
-              user,
-              e.getResponseBodyAsString(Charset.defaultCharset()));
-        })
-        .takeUntil(message -> !running)
-        .doOnComplete(() -> log.info("TestWriter {} is done", user))
-        .then();
-  }
-
-  private Mono<MessageTo> sendMessage(
-      ChatRoomInfoTo chatRoom,
-      String message)
-  {
-    return webClient
-        .put()
-        .uri(
-            "/{chatRoomId}/{username}/{serial}",
-            chatRoom.getId(),
-            user.getName(),
-            user.nextSerial())
-        .contentType(MediaType.TEXT_PLAIN)
-        .accept(MediaType.APPLICATION_JSON)
-        .header("X-Shard", chatRoom.getShard().toString())
-        .bodyValue(message)
-        .exchangeToMono(response ->
-        {
-          if (response.statusCode().equals(HttpStatus.OK))
-          {
-            return response.bodyToMono(MessageTo.class);
-          }
-          else
-          {
-            return response.createError();
-          }
-        });
-  }
-
-
-  private final WebClient webClient;
-
-  final ChatRoomInfoTo chatRoom;
-  final User user;
-  final List<MessageTo> sentMessages = new LinkedList<>();
-
-  volatile boolean running = true;
-  @Getter
-  private volatile int numSentMessages = 0;
-
-
-  TestWriter(Integer port, ChatRoomInfoTo chatRoom, String username)
-  {
-    webClient = WebClient.create("http://localhost:" + port);
-    this.chatRoom = chatRoom;
-    user = new User(username);
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/User.java b/src/test/java/de/juplo/kafka/chat/backend/User.java
deleted file mode 100644 (file)
index bcd97b4..0000000
+++ /dev/null
@@ -1,27 +0,0 @@
-package de.juplo.kafka.chat.backend;
-
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.ToString;
-
-
-@EqualsAndHashCode(of = "name")
-@ToString(of = "name")
-class User
-{
-  @Getter
-  private final String name;
-  private int serial = 0;
-
-
-  User (String name)
-  {
-    this.name = name;
-  }
-
-
-  int nextSerial()
-  {
-    return ++serial;
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/AbstractHandoverIT.java b/src/test/java/de/juplo/kafka/chat/backend/api/AbstractHandoverIT.java
new file mode 100644 (file)
index 0000000..e92155d
--- /dev/null
@@ -0,0 +1,143 @@
+package de.juplo.kafka.chat.backend;
+
+import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
+import de.juplo.kafka.chat.backend.api.MessageTo;
+import lombok.extern.slf4j.Slf4j;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.web.reactive.function.client.WebClient;
+import pl.rzrz.assertj.reactor.Assertions;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+
+@Slf4j
+public abstract class AbstractHandoverIT
+{
+  static final int NUM_CHATROOMS = 23;
+  static final int NUM_CLIENTS = 17;
+
+
+  private final AbstractHandoverITContainers containers;
+
+
+  AbstractHandoverIT(AbstractHandoverITContainers containers)
+  {
+    this.containers = containers;
+  }
+
+
+  @Test
+  void test() throws InterruptedException
+  {
+    log.info("Starting backend-1...");
+    containers.startBackend(containers.backend1, new TestWriter[0]);
+    log.info("backend-1 started!");
+
+    ChatRoomInfoTo[] chatRooms = Flux
+        .range(0, NUM_CHATROOMS)
+        .flatMap(i -> createChatRoom("room-" + i))
+        .toStream()
+        .toArray(size -> new ChatRoomInfoTo[size]);
+
+    int port = containers.haproxy.getMappedPort(8400);
+
+    CompletableFuture<Void>[] testWriterFutures = new CompletableFuture[NUM_CLIENTS];
+    TestWriter[] testWriters = new TestWriter[NUM_CLIENTS];
+    for (int i = 0; i < NUM_CLIENTS; i++)
+    {
+      TestWriter testWriter = new TestWriter(
+          port,
+          chatRooms[i % NUM_CHATROOMS],
+          "user-" + i);
+      testWriters[i] = testWriter;
+      testWriterFutures[i] = testWriter
+          .run()
+          .toFuture();
+    }
+
+    TestListener testListener = new TestListener(port, chatRooms);
+    testListener
+        .run()
+        .subscribe(message -> log.info(
+            "Received message: {}",
+            message));
+
+    log.info("Starting backend-2...");
+    containers.startBackend(containers.backend2, testWriters);
+    log.info("backend-2 started!");
+
+    log.info("Starting backend-3...");
+    containers.startBackend(containers.backend3, testWriters);
+    log.info("backend-3 started!");
+
+    for (int i = 0; i < NUM_CLIENTS; i++)
+    {
+      testWriters[i].running = false;
+      testWriterFutures[i].join();
+      log.info("Joined TestWriter {}", testWriters[i].user);
+    }
+
+    Awaitility
+        .await()
+        .atMost(Duration.ofSeconds(5))
+        .untilAsserted(() -> assertAllSentMessagesReceived(testWriters, testListener));
+  }
+
+  private void assertAllSentMessagesReceived(
+      TestWriter[] testWriters,
+      TestListener testListener)
+  {
+    for (int i = 0; i < NUM_CLIENTS; i++)
+    {
+      TestWriter testWriter = testWriters[i];
+      ChatRoomInfoTo chatRoom = testWriter.chatRoom;
+      List<MessageTo> receivedMessages = testListener.receivedMessages.get(chatRoom.getId());
+
+      Assertions.assertThat(receivedMessages
+          .stream()
+          .filter(message -> message.getUser().equals(testWriter.user.getName()))
+          ).containsExactlyElementsOf(testWriter.sentMessages);
+    }
+  }
+
+  Mono<ChatRoomInfoTo> createChatRoom(String name)
+  {
+    return webClient
+        .post()
+        .uri("/create")
+        .contentType(MediaType.TEXT_PLAIN)
+        .bodyValue(name)
+        .accept(MediaType.APPLICATION_JSON)
+        .exchangeToMono(response ->
+        {
+          if (response.statusCode().equals(HttpStatus.OK))
+          {
+            return response.bodyToMono(ChatRoomInfoTo.class);
+          }
+          else
+          {
+            return response.createError();
+          }
+        });
+  }
+
+
+  WebClient webClient;
+
+  @BeforeEach
+  void setUp() throws Exception
+  {
+    containers.setUp();
+
+    Integer port = containers.haproxy.getMappedPort(8400);
+    webClient = WebClient.create("http://localhost:" + port);
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/AbstractHandoverITContainers.java b/src/test/java/de/juplo/kafka/chat/backend/api/AbstractHandoverITContainers.java
new file mode 100644 (file)
index 0000000..0b245a9
--- /dev/null
@@ -0,0 +1,256 @@
+package de.juplo.kafka.chat.backend;
+
+import de.juplo.kafka.chat.backend.implementation.haproxy.MapEntryTo;
+import de.juplo.kafka.chat.backend.implementation.haproxy.MapInfoTo;
+import lombok.extern.slf4j.Slf4j;
+import org.awaitility.Awaitility;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.HttpStatusCode;
+import org.springframework.http.MediaType;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.images.ImagePullPolicy;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.MountableFile;
+import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+
+import static de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyDataPlaneApiShardingPublisherStrategy.MAP_PARAM;
+
+
+@Slf4j
+public abstract class AbstractHandoverITContainers
+{
+  static final ImagePullPolicy NEVER_PULL = imageName -> false;
+
+
+  final Network network = Network.newNetwork();
+  final GenericContainer haproxy, backend1, backend2, backend3;
+
+
+  AbstractHandoverITContainers()
+  {
+    haproxy = createHaproxyContainer();
+    haproxy.start();
+
+    backend1 = createBackendContainer("1");
+    backend2 = createBackendContainer("2");
+    backend3 = createBackendContainer("3");
+  }
+
+
+  void setUpExtra() throws Exception
+  {
+    log.info("This setup does not need any extra containers");
+  }
+
+  void setUp() throws Exception
+  {
+    setUpExtra();
+  }
+
+  void startBackend(
+      GenericContainer backend,
+      TestWriter[] testWriters)
+  {
+    backend.start();
+
+    int[] numSentMessages = Arrays
+        .stream(testWriters)
+        .mapToInt(testWriter -> testWriter.getNumSentMessages())
+        .toArray();
+
+    String backendUri = "http://localhost:" + backend.getMappedPort(8080);
+
+    Instant before, after;
+
+    before = Instant.now();
+    HttpStatusCode statusCode = WebClient
+        .create(backendUri)
+        .get()
+        .uri("/actuator/health")
+        .exchangeToMono(response ->
+        {
+          log.info("{} responded with {}", backendUri, response.statusCode());
+          return Mono.just(response.statusCode());
+        })
+        .flatMap(status -> switch (status.value())
+        {
+          case 200, 503 -> Mono.just(status);
+          default -> Mono.error(new RuntimeException(status.toString()));
+        })
+        .retryWhen(Retry.backoff(30, Duration.ofSeconds(1)))
+        .block();
+    after = Instant.now();
+    log.info("Took {} to reach status {}", Duration.between(before, after), statusCode);
+
+    before = Instant.now();
+    Awaitility
+        .await()
+        .atMost(Duration.ofSeconds(45))
+        .until(() -> WebClient
+            .create(backendUri)
+            .get()
+            .uri("/actuator/health")
+            .exchangeToMono(response ->
+            {
+              log.info("{} responded with {}", backendUri, response.statusCode());
+              if (response.statusCode().equals(HttpStatus.OK))
+              {
+                return response
+                    .bodyToMono(StatusTo.class)
+                    .map(StatusTo::getStatus)
+                    .map(status -> status.equalsIgnoreCase("UP"));
+              }
+              else
+              {
+                return Mono.just(false);
+              }
+            })
+            .block());
+    after = Instant.now();
+    log.info("Took {} until the backend reported status UP", Duration.between(before, after));
+
+    haproxy
+        .getDockerClient()
+        .killContainerCmd(haproxy.getContainerId())
+        .withSignal("HUP")
+        .exec();
+
+    before = Instant.now();
+    Awaitility
+        .await()
+        .atMost(Duration.ofSeconds(15))
+        .until(() -> WebClient
+            .create("http://localhost:" + haproxy.getMappedPort(8400))
+            .get()
+            .uri("/actuator/health")
+            .exchangeToMono(response ->
+            {
+              log.info("{} responded with {}", backendUri, response.statusCode());
+              if (response.statusCode().equals(HttpStatus.OK))
+              {
+                return response
+                    .bodyToMono(StatusTo.class)
+                    .map(StatusTo::getStatus)
+                    .map(status -> status.equalsIgnoreCase("UP"));
+              }
+              else
+              {
+                return Mono.just(false);
+              }
+            })
+            .block());
+    after = Instant.now();
+    log.info("Took {} until haproxy reported status UP", Duration.between(before, after));
+
+    before = Instant.now();
+    Awaitility
+        .await()
+        .atMost(Duration.ofSeconds(30))
+        .until(() ->
+        {
+          for (int i = 0; i < testWriters.length; i++)
+          {
+            TestWriter testWriter = testWriters[i];
+            int sentTotal = testWriter.getNumSentMessages();
+            if (sentTotal == numSentMessages[i])
+            {
+              log.info(
+                  "No progress for {}: sent-before={}, sent-total={}, map: {}",
+                  testWriter,
+                  numSentMessages[i],
+                  sentTotal,
+                  readHaproxyMap());
+              return false;
+            }
+          }
+
+          return true;
+        });
+    after = Instant.now();
+    log.info("Took {} until all writers made some progress", Duration.between(before, after));
+  }
+
+  abstract String[] getBackendCommand();
+
+  final GenericContainer createHaproxyContainer()
+  {
+    return new GenericContainer(DockerImageName.parse("haproxytech/haproxy-debian:2.8"))
+        .withCommand("-f", "/etc/haproxy")
+        .withNetwork(network)
+        .withNetworkAliases("haproxy")
+        .withCopyFileToContainer(
+            MountableFile.forClasspathResource("haproxy.cfg"),
+            "/etc/haproxy/haproxy.cfg")
+        .withCopyFileToContainer(
+            MountableFile.forClasspathResource("dataplaneapi.yml"),
+            "/etc/haproxy/dataplaneapi.yml")
+        .withCopyFileToContainer(
+            MountableFile.forClasspathResource("sharding.map"),
+            "/etc/haproxy/maps/sharding.map")
+        .withExposedPorts(8400, 8401, 8404, 5555)
+        .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("HAPROXY"));
+  }
+
+  final GenericContainer createBackendContainer(String id)
+  {
+    return new GenericContainer(DockerImageName.parse("juplo/chat-backend:0.0.1-SNAPSHOT"))
+      .withImagePullPolicy(NEVER_PULL)
+      .withNetwork(network)
+      .withNetworkAliases("backend-ID".replaceAll("ID", id))
+      .withCommand(Arrays.stream(getBackendCommand())
+          .map(commandPart -> commandPart.replaceAll("ID", id))
+          .toArray(size -> new String[size]))
+      .withExposedPorts(8080)
+      .waitingFor(Wait.forLogMessage(".*Started\\ ChatBackendApplication.*\\n", 1))
+      .withLogConsumer(new Slf4jLogConsumer(
+          log,
+          true
+          )
+          .withPrefix("BACKEND-ID".replaceAll("ID", id)));
+  }
+
+  private String readHaproxyMap()
+  {
+    return createHaproxyWebClient()
+        .get()
+        .uri(uriBuilder -> uriBuilder
+            .path("/services/haproxy/runtime/maps_entries")
+            .queryParam(MAP_PARAM, MAP_NAME)
+            .build())
+        .accept(MediaType.APPLICATION_JSON)
+        .exchangeToFlux(response ->
+        {
+          if (response.statusCode().equals(HttpStatus.OK))
+          {
+            return response.bodyToFlux(MapEntryTo.class);
+          }
+          else
+          {
+            return response.<MapEntryTo>createError().flux();
+          }
+        })
+        .map(entry -> entry.key() + "=" + entry.value())
+        .reduce((a, b) -> a + ", " + b)
+        .block();
+  }
+
+  private WebClient createHaproxyWebClient()
+  {
+    return WebClient
+        .builder()
+        .baseUrl("http://localhost:" + haproxy.getMappedPort(5555) + "/v2/")
+        .defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth("juplo", "juplo"))
+        .build();
+  }
+
+  static final String MAP_NAME = "sharding";
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/KafkaHandoverIT.java b/src/test/java/de/juplo/kafka/chat/backend/api/KafkaHandoverIT.java
new file mode 100644 (file)
index 0000000..0a6c75d
--- /dev/null
@@ -0,0 +1,13 @@
+package de.juplo.kafka.chat.backend;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+class KafkaHandoverIT extends AbstractHandoverIT
+{
+  KafkaHandoverIT()
+  {
+    super(new KafkaHandoverITContainers());
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/KafkaHandoverITContainers.java b/src/test/java/de/juplo/kafka/chat/backend/api/KafkaHandoverITContainers.java
new file mode 100644 (file)
index 0000000..9a88012
--- /dev/null
@@ -0,0 +1,81 @@
+package de.juplo.kafka.chat.backend;
+
+import lombok.extern.slf4j.Slf4j;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+
+
+@Slf4j
+class KafkaHandoverITContainers extends AbstractHandoverITContainers
+{
+  @Override
+  void setUpExtra() throws IOException, InterruptedException
+  {
+    kafka.start();
+
+    Container.ExecResult result;
+    result = kafka.execInContainer(
+        "kafka-topics",
+        "--bootstrap-server",
+        "kafka:9999",
+        "--create",
+        "--topic",
+        "info_channel",
+        "--partitions",
+        "3");
+    log.info(
+        "EXIT-CODE={}, STDOUT={}, STDERR={}",
+        result.getExitCode(),
+        result.getStdout(),
+        result.getStdout());
+    result = kafka.execInContainer(
+        "kafka-topics",
+        "--bootstrap-server",
+        "kafka:9999",
+        "--create",
+        "--topic",
+        "data_channel",
+        "--partitions",
+        "10");
+    log.info(
+        "EXIT-CODE={}, STDOUT={}, STDERR={}",
+        result.getExitCode(),
+        result.getStdout(),
+        result.getStdout());
+  }
+
+
+  private final KafkaContainer kafka =
+      new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
+          .withNetwork(network)
+          .withNetworkAliases("kafka")
+          .withListener(() -> "kafka:9999")
+          .withKraft()
+          .waitingFor(Wait.forLogMessage(".*Kafka\\ Server\\ started.*\\n", 1))
+          .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("KAFKA"));
+
+
+  @Override
+  String[] getBackendCommand()
+  {
+    return new String[]
+    {
+        "--chat.backend.instance-id=backend_ID",
+        "--chat.backend.services=kafka",
+        "--chat.backend.kafka.bootstrap-servers=kafka:9999",
+        "--chat.backend.kafka.instance-uri=http://backend-ID:8080",
+        "--chat.backend.kafka.num-partitions=10",
+        "--chat.backend.kafka.client-id-prefix=BID",
+        "--chat.backend.kafka.haproxy-data-plane-api=http://haproxy:5555/v2/",
+        "--chat.backend.kafka.haproxy-user=juplo",
+        "--chat.backend.kafka.haproxy-password=juplo",
+        "--chat.backend.kafka.haproxy-map=sharding",
+        "--logging.level.de.juplo=DEBUG"
+    };
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/StatusTo.java b/src/test/java/de/juplo/kafka/chat/backend/api/StatusTo.java
new file mode 100644 (file)
index 0000000..f620942
--- /dev/null
@@ -0,0 +1,13 @@
+package de.juplo.kafka.chat.backend;
+
+import lombok.Getter;
+import lombok.Setter;
+
+
+
+@Getter
+@Setter
+class StatusTo
+{
+  String status;
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/TestListener.java b/src/test/java/de/juplo/kafka/chat/backend/api/TestListener.java
new file mode 100644 (file)
index 0000000..092cd43
--- /dev/null
@@ -0,0 +1,91 @@
+package de.juplo.kafka.chat.backend;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
+import de.juplo.kafka.chat.backend.api.MessageTo;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.http.MediaType;
+import org.springframework.http.codec.ServerSentEvent;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+
+import java.time.Duration;
+import java.util.*;
+
+
+@Slf4j
+public class TestListener
+{
+  static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
+
+
+  public Flux<MessageTo> run()
+  {
+    return Flux
+        .fromArray(chatRooms)
+        .flatMap(chatRoom ->
+        {
+          List<MessageTo> list = new LinkedList<>();
+          receivedMessages.put(chatRoom.getId(), list);
+          return receiveMessages(chatRoom);
+        });
+  }
+
+  Flux<MessageTo> receiveMessages(ChatRoomInfoTo chatRoom)
+  {
+    log.info("Requesting messages for chat-room {}", chatRoom);
+    List<MessageTo> list = receivedMessages.get(chatRoom.getId());
+    return receiveServerSentEvents(chatRoom)
+        .flatMap(sse ->
+        {
+          try
+          {
+            return Mono.just(objectMapper.readValue(sse.data(), MessageTo.class));
+          }
+          catch (Exception e)
+          {
+            return Mono.error(e);
+          }
+        })
+        .doOnNext(message -> list.add(message))
+        .doOnComplete(() -> log.info("Listening to {} was completed!", chatRoom))
+        .doOnError(throwalbe -> log.error("Listening to {} failed!", chatRoom, throwalbe))
+        .thenMany(Flux.defer(() -> receiveMessages(chatRoom)));
+  }
+
+  Flux<ServerSentEvent<String>> receiveServerSentEvents(ChatRoomInfoTo chatRoom)
+  {
+    return webClient
+        .get()
+        .uri(
+            "/{chatRoomId}/listen",
+            chatRoom.getId())
+        .accept(MediaType.TEXT_EVENT_STREAM)
+        .header("X-Shard", chatRoom.getShard().toString())
+        .retrieve()
+        .bodyToFlux(SSE_TYPE)
+        .retryWhen(Retry.fixedDelay(15, Duration.ofSeconds(1)));
+  }
+
+
+  private final WebClient webClient;
+  private final ChatRoomInfoTo[] chatRooms;
+  private final ObjectMapper objectMapper;
+
+  final Map<UUID, List<MessageTo>> receivedMessages = new HashMap<>();
+
+
+  TestListener(Integer port, ChatRoomInfoTo[] chatRooms)
+  {
+    webClient = WebClient.create("http://localhost:" + port);
+    this.chatRooms = chatRooms;
+    objectMapper = new ObjectMapper();
+    objectMapper.registerModule(new JavaTimeModule());
+    objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/TestWriter.java b/src/test/java/de/juplo/kafka/chat/backend/api/TestWriter.java
new file mode 100644 (file)
index 0000000..4220d00
--- /dev/null
@@ -0,0 +1,129 @@
+package de.juplo.kafka.chat.backend;
+
+import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
+import de.juplo.kafka.chat.backend.api.MessageTo;
+import lombok.Getter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+
+import java.nio.charset.Charset;
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+
+@ToString(of = "user")
+@Slf4j
+public class TestWriter
+{
+  public Mono<Void> run()
+  {
+    return Flux
+        .fromIterable((Iterable<Integer>) () -> new Iterator<>()
+        {
+          private int i = 0;
+
+          @Override
+          public boolean hasNext()
+          {
+            return running;
+          }
+
+          @Override
+          public Integer next()
+          {
+            return i++;
+          }
+        })
+        .delayElements(Duration.ofMillis(ThreadLocalRandom.current().nextLong(500, 1500)))
+        .map(i -> "Message #" + i)
+        .concatMap(message -> sendMessage(chatRoom, message)
+            .doOnError(throwable ->
+            {
+              WebClientResponseException e = (WebClientResponseException)throwable;
+              log.info(
+                "Could not sent message {} for {}: {}",
+                message,
+                user.getName(),
+                e.getResponseBodyAsString());
+            })
+            .retryWhen(Retry.fixedDelay(60, Duration.ofSeconds(1))))
+        .doOnNext(message ->
+        {
+          numSentMessages++;
+          sentMessages.add(message);
+          log.info(
+              "{} sent a message to {}: {}",
+             user,
+             chatRoom,
+             message);
+        })
+        .doOnError(throwable ->
+        {
+          WebClientResponseException e = (WebClientResponseException)throwable.getCause();
+          log.error(
+              "{} failed sending a message: {}",
+              user,
+              e.getResponseBodyAsString(Charset.defaultCharset()));
+        })
+        .takeUntil(message -> !running)
+        .doOnComplete(() -> log.info("TestWriter {} is done", user))
+        .then();
+  }
+
+  private Mono<MessageTo> sendMessage(
+      ChatRoomInfoTo chatRoom,
+      String message)
+  {
+    return webClient
+        .put()
+        .uri(
+            "/{chatRoomId}/{username}/{serial}",
+            chatRoom.getId(),
+            user.getName(),
+            user.nextSerial())
+        .contentType(MediaType.TEXT_PLAIN)
+        .accept(MediaType.APPLICATION_JSON)
+        .header("X-Shard", chatRoom.getShard().toString())
+        .bodyValue(message)
+        .exchangeToMono(response ->
+        {
+          if (response.statusCode().equals(HttpStatus.OK))
+          {
+            return response.bodyToMono(MessageTo.class);
+          }
+          else
+          {
+            return response.createError();
+          }
+        });
+  }
+
+
+  private final WebClient webClient;
+
+  final ChatRoomInfoTo chatRoom;
+  final User user;
+  final List<MessageTo> sentMessages = new LinkedList<>();
+
+  volatile boolean running = true;
+  @Getter
+  private volatile int numSentMessages = 0;
+
+
+  TestWriter(Integer port, ChatRoomInfoTo chatRoom, String username)
+  {
+    webClient = WebClient.create("http://localhost:" + port);
+    this.chatRoom = chatRoom;
+    user = new User(username);
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/User.java b/src/test/java/de/juplo/kafka/chat/backend/api/User.java
new file mode 100644 (file)
index 0000000..bcd97b4
--- /dev/null
@@ -0,0 +1,27 @@
+package de.juplo.kafka.chat.backend;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+
+
+@EqualsAndHashCode(of = "name")
+@ToString(of = "name")
+class User
+{
+  @Getter
+  private final String name;
+  private int serial = 0;
+
+
+  User (String name)
+  {
+    this.name = name;
+  }
+
+
+  int nextSerial()
+  {
+    return ++serial;
+  }
+}