@Testcontainers
@Slf4j
-public abstract class HandoverIT
+public abstract class AbstractHandoverIT
{
static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {};
- private final HandoverITContainers containers;
+ private final AbstractHandoverITContainers containers;
- HandoverIT(HandoverITContainers containers)
+ AbstractHandoverIT(AbstractHandoverITContainers containers)
{
this.containers = containers;
}
@Slf4j
-public abstract class HandoverITContainers
+public abstract class AbstractHandoverITContainers
{
static final ImagePullPolicy NEVER_PULL = imageName -> false;
final GenericContainer haproxy, backend1, backend2, backend3;
- HandoverITContainers()
+ AbstractHandoverITContainers()
{
haproxy = createHaproxyContainer();
backend1 = createBackendContainer("1");
--- /dev/null
+package de.juplo.kafka.chat.backend;
+
+import lombok.extern.slf4j.Slf4j;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+
+
+@Slf4j
+class KafkaAbstractHandoverITContainers extends AbstractHandoverITContainers
+{
+ private final KafkaContainer kafka;
+
+
+ KafkaAbstractHandoverITContainers()
+ {
+ kafka = createKafkaContainer();
+ }
+
+
+ @Override
+ void setUpExtra() throws IOException, InterruptedException
+ {
+ kafka.start();
+
+ Container.ExecResult result;
+ result = kafka.execInContainer(
+ "kafka-topics",
+ "--bootstrap-server",
+ "kafka:9999",
+ "--create",
+ "--topic",
+ "info_channel",
+ "--partitions",
+ "3");
+ log.info(
+ "EXIT-CODE={}, STDOUT={}, STDERR={}",
+ result.getExitCode(),
+ result.getStdout(),
+ result.getStdout());
+ result = kafka.execInContainer(
+ "kafka-topics",
+ "--bootstrap-server",
+ "kafka:9999",
+ "--create",
+ "--topic",
+ "data_channel",
+ "--partitions",
+ "10");
+ log.info(
+ "EXIT-CODE={}, STDOUT={}, STDERR={}",
+ result.getExitCode(),
+ result.getStdout(),
+ result.getStdout());
+ }
+
+ private final KafkaContainer createKafkaContainer()
+ {
+ return new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
+ .withNetwork(network)
+ .withNetworkAliases("kafka")
+ .withListener(() -> "kafka:9999")
+ .withKraft()
+ .waitingFor(Wait.forLogMessage(".*Kafka\\ Server\\ started.*\\n", 1))
+ .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("KAFKA"));
+ }
+
+ @Override
+ String[] getBackendCommand()
+ {
+ return new String[]
+ {
+ "--chat.backend.instance-id=backend-ID",
+ "--chat.backend.services=kafka",
+ "--chat.backend.kafka.bootstrap-servers=kafka:9999",
+ "--chat.backend.kafka.instance-uri=http://backend-ID:8080",
+ "--chat.backend.kafka.num-partitions=10",
+ "--chat.backend.kafka.client-id-prefix=BID",
+ "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401",
+ "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map"
+ };
+ }
+}
@Slf4j
-class KafkaHandoverIT extends HandoverIT
+class KafkaHandoverIT extends AbstractHandoverIT
{
KafkaHandoverIT()
{
- super(new KafkaHandoverITContainers());
+ super(new KafkaAbstractHandoverITContainers());
}
}
+++ /dev/null
-package de.juplo.kafka.chat.backend;
-
-import lombok.extern.slf4j.Slf4j;
-import org.testcontainers.containers.Container;
-import org.testcontainers.containers.KafkaContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.containers.wait.strategy.Wait;
-import org.testcontainers.utility.DockerImageName;
-
-import java.io.IOException;
-
-
-@Slf4j
-class KafkaHandoverITContainers extends HandoverITContainers
-{
- private final KafkaContainer kafka;
-
-
- KafkaHandoverITContainers()
- {
- kafka = createKafkaContainer();
- }
-
-
- @Override
- void setUpExtra() throws IOException, InterruptedException
- {
- kafka.start();
-
- Container.ExecResult result;
- result = kafka.execInContainer(
- "kafka-topics",
- "--bootstrap-server",
- "kafka:9999",
- "--create",
- "--topic",
- "info_channel",
- "--partitions",
- "3");
- log.info(
- "EXIT-CODE={}, STDOUT={}, STDERR={}",
- result.getExitCode(),
- result.getStdout(),
- result.getStdout());
- result = kafka.execInContainer(
- "kafka-topics",
- "--bootstrap-server",
- "kafka:9999",
- "--create",
- "--topic",
- "data_channel",
- "--partitions",
- "10");
- log.info(
- "EXIT-CODE={}, STDOUT={}, STDERR={}",
- result.getExitCode(),
- result.getStdout(),
- result.getStdout());
- }
-
- private final KafkaContainer createKafkaContainer()
- {
- return new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
- .withNetwork(network)
- .withNetworkAliases("kafka")
- .withListener(() -> "kafka:9999")
- .withKraft()
- .waitingFor(Wait.forLogMessage(".*Kafka\\ Server\\ started.*\\n", 1))
- .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("KAFKA"));
- }
-
- @Override
- String[] getBackendCommand()
- {
- return new String[]
- {
- "--chat.backend.instance-id=backend-ID",
- "--chat.backend.services=kafka",
- "--chat.backend.kafka.bootstrap-servers=kafka:9999",
- "--chat.backend.kafka.instance-uri=http://backend-ID:8080",
- "--chat.backend.kafka.num-partitions=10",
- "--chat.backend.kafka.client-id-prefix=BID",
- "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401",
- "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map"
- };
- }
-}