WIP:test: HandoverIT-POC - splitted up code into smaller classes -- ALIGN
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / KafkaHandoverITContainers.java
1 package de.juplo.kafka.chat.backend;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.testcontainers.containers.Container;
5 import org.testcontainers.containers.KafkaContainer;
6 import org.testcontainers.containers.output.Slf4jLogConsumer;
7 import org.testcontainers.containers.wait.strategy.Wait;
8 import org.testcontainers.utility.DockerImageName;
9
10 import java.io.IOException;
11
12
13 @Slf4j
14 class KafkaHandoverITContainers extends HandoverITContainers
15 {
16   private final KafkaContainer kafka;
17
18
19   KafkaHandoverITContainers()
20   {
21     kafka = createKafkaContainer();
22   }
23
24
25   @Override
26   void setUpExtra() throws IOException, InterruptedException
27   {
28     kafka.start();
29
30     Container.ExecResult result;
31     result = kafka.execInContainer(
32         "kafka-topics",
33         "--bootstrap-server",
34         "kafka:9999",
35         "--create",
36         "--topic",
37         "info_channel",
38         "--partitions",
39         "3");
40     log.info(
41         "EXIT-CODE={}, STDOUT={}, STDERR={}",
42         result.getExitCode(),
43         result.getStdout(),
44         result.getStdout());
45     result = kafka.execInContainer(
46         "kafka-topics",
47         "--bootstrap-server",
48         "kafka:9999",
49         "--create",
50         "--topic",
51         "data_channel",
52         "--partitions",
53         "10");
54     log.info(
55         "EXIT-CODE={}, STDOUT={}, STDERR={}",
56         result.getExitCode(),
57         result.getStdout(),
58         result.getStdout());
59   }
60
61   private final KafkaContainer createKafkaContainer()
62   {
63     return new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
64         .withNetwork(network)
65         .withNetworkAliases("kafka")
66         .withListener(() -> "kafka:9999")
67         .withKraft()
68         .waitingFor(Wait.forLogMessage(".*Kafka\\ Server\\ started.*\\n", 1))
69         .withLogConsumer(new Slf4jLogConsumer(log, true).withPrefix("KAFKA"));
70   }
71
72   @Override
73   String[] getBackendCommand()
74   {
75     return new String[]
76     {
77         "--chat.backend.instance-id=backend-ID",
78         "--chat.backend.services=kafka",
79         "--chat.backend.kafka.bootstrap-servers=kafka:9999",
80         "--chat.backend.kafka.instance-uri=http://backend-ID:8080",
81         "--chat.backend.kafka.num-partitions=10",
82         "--chat.backend.kafka.client-id-prefix=BID",
83         "--chat.backend.kafka.haproxy-runtime-api=haproxy:8401",
84         "--chat.backend.kafka.haproxy-map=/usr/local/etc/haproxy/sharding.map"
85     };
86   }
87 }