-version: '3.2'
services:
+ zookeeper:
+ image: confluentinc/cp-zookeeper:7.7.1
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+ ports:
+ - 2181:2181
+ volumes:
+ - zookeeper-data:/var/lib/zookeeper/data
+ - zookeeper-log:/var/lib/zookeeper/log
kafka-1:
- image: bitnami/kafka:3.4
+ image: confluentinc/cp-kafka:7.7.1
environment:
- KAFKA_ENABLE_KRAFT: 'yes'
- KAFKA_KRAFT_CLUSTER_ID: r7dMBY60T16TrNCGeXniLw
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
- KAFKA_CFG_LISTENERS: CONTROLLER://:9093, BROKER://:9092, LOCALHOST://:9081
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT, CONTROLLER:PLAINTEXT, LOCALHOST:PLAINTEXT
- KAFKA_CFG_ADVERTISED_LISTENERS: BROKER://kafka-1:9092, LOCALHOST://localhost:9081
- KAFKA_CFG_NODE_ID: 1
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093, 2@kafka-2:9093, 3@kafka-3:9093
- ALLOW_PLAINTEXT_LISTENER: 'yes'
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME: BROKER
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "false"
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_LISTENERS: BROKER://:9092, LOCALHOST://:9081
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT, LOCALHOST:PLAINTEXT
+ KAFKA_ADVERTISED_LISTENERS: BROKER://kafka-1:9092, LOCALHOST://localhost:9081
+ KAFKA_BROKER_ID: 1
+ KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+ KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 1000
+ volumes:
+ - kafka-1-data:/var/lib/kafka/data
ports:
- 9081:9081
+ stop_grace_period: 120s
+ depends_on:
+ - zookeeper
kafka-2:
- image: bitnami/kafka:3.4
+ image: confluentinc/cp-kafka:7.7.1
environment:
- KAFKA_ENABLE_KRAFT: 'yes'
- KAFKA_KRAFT_CLUSTER_ID: r7dMBY60T16TrNCGeXniLw
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
- KAFKA_CFG_LISTENERS: CONTROLLER://:9093, BROKER://:9092, LOCALHOST://:9082
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT, CONTROLLER:PLAINTEXT, LOCALHOST:PLAINTEXT
- KAFKA_CFG_ADVERTISED_LISTENERS: BROKER://kafka-2:9092, LOCALHOST://localhost:9082
- KAFKA_CFG_NODE_ID: 2
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093, 2@kafka-2:9093, 3@kafka-3:9093
- ALLOW_PLAINTEXT_LISTENER: 'yes'
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME: BROKER
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "false"
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_LISTENERS: BROKER://:9092, LOCALHOST://:9082
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT, LOCALHOST:PLAINTEXT
+ KAFKA_ADVERTISED_LISTENERS: BROKER://kafka-2:9092, LOCALHOST://localhost:9082
+ KAFKA_BROKER_ID: 2
+ KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+ KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 10000
+ volumes:
+ - kafka-2-data:/var/lib/kafka/data
ports:
- 9092:9082
- 9082:9082
default:
aliases:
- kafka
+ stop_grace_period: 120s
+ depends_on:
+ - zookeeper
kafka-3:
- image: bitnami/kafka:3.4
+ image: confluentinc/cp-kafka:7.7.1
environment:
- KAFKA_ENABLE_KRAFT: 'yes'
- KAFKA_KRAFT_CLUSTER_ID: r7dMBY60T16TrNCGeXniLw
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
- KAFKA_CFG_LISTENERS: CONTROLLER://:9093, BROKER://:9092, LOCALHOST://:9083
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT, CONTROLLER:PLAINTEXT, LOCALHOST:PLAINTEXT
- KAFKA_CFG_ADVERTISED_LISTENERS: BROKER://kafka-3:9092, LOCALHOST://localhost:9083
- KAFKA_CFG_NODE_ID: 3
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093, 2@kafka-2:9093, 3@kafka-3:9093
- ALLOW_PLAINTEXT_LISTENER: 'yes'
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME: BROKER
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "false"
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_LISTENERS: BROKER://:9092, LOCALHOST://:9083
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT, LOCALHOST:PLAINTEXT
+ KAFKA_ADVERTISED_LISTENERS: BROKER://kafka-3:9092, LOCALHOST://localhost:9083
+ KAFKA_BROKER_ID: 3
+ KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+ KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 10000
+ volumes:
+ - kafka-3-data:/var/lib/kafka/data
ports:
- 9083:9083
+ stop_grace_period: 120s
+ depends_on:
+ - zookeeper
+ schema-registry:
+ image: confluentinc/cp-schema-registry:7.7.1
+ environment:
+ SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
+ SCHEMA_REGISTRY_HOST_NAME: schema-registry
+ SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8085
+ SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
+ ports:
+ - 8085:8085
+ depends_on:
+ - kafka-1
+ - kafka-2
+ - kafka-3
- setup:
+ connect:
+ image: confluentinc/cp-kafka-connect:7.7.1
+ environment:
+ CONNECT_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
+ CONNECT_REST_PORT: 8083
+ CONNECT_REST_LISTENERS: http://0.0.0.0:8083
+ CONNECT_REST_ADVERTISED_HOST_NAME: connect
+ CONNECT_CONFIG_STORAGE_TOPIC: __connect-config
+ CONNECT_OFFSET_STORAGE_TOPIC: __connect-offsets
+ CONNECT_STATUS_STORAGE_TOPIC: __connect-status
+ CONNECT_GROUP_ID: kafka-connect
+ CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "true"
+ CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
+ CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8085
+ CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true"
+ CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
+ CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8085
+ CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
+ CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
+ CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
+ CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
+ CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
+ CONNECT_PLUGIN_PATH: /usr/share/java/
+ ports:
+ - 8083:8083
+ depends_on:
+ - schema-registry
+
+ cli:
image: juplo/toolbox
- command: >
- bash -c "
- kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
- kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 --replication-factor 3 --config min.insync.replicas=2
- echo Das Topic \'test\' wurde erfolgreich angelegt:
- kafka-topics --bootstrap-server kafka:9092 --describe --topic test
- echo \'docker-compose restart -t0 setup\' löscht das Topic und legt es neu an
- sleep infinity
- "
+ command: sleep infinity
+ stop_grace_period: 0s
depends_on:
- kafka-1
- kafka-2
- kafka-3
+ setup:
+ image: juplo/toolbox
+ command:
+ - bash
+ - -c
+ - |
+ cub kafka-ready -b kafka-1:9092,kafka-2:9092,kafka-3:9092 3 60 > /dev/null 2>&1 || exit 1
+ if [ -e INITIALIZED ]
+ then
+ echo -n Bereits konfiguriert:
+ cat INITIALIZED
+ kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+ else
+ kafka-topics --bootstrap-server kafka:9092 \
+ --delete \
+ --if-exists \
+ --topic test
+ kafka-topics --bootstrap-server kafka:9092 \
+ --create \
+ --topic test \
+ --partitions 2 \
+ --replication-factor 3 \
+ --config min.insync.replicas=2 \
+ && echo Das Topic \'test\' wurde erfolgreich angelegt: \
+ && kafka-topics --bootstrap-server kafka:9092 --describe --topic test \
+ && date > INITIALIZED
+ fi
+ stop_grace_period: 0s
+ depends_on:
+ - cli
+
+ zoonavigator:
+ image: elkozmon/zoonavigator:1.1.2
+ ports:
+ - "8000:80"
+ environment:
+ HTTP_PORT: 80
+ CONNECTION_JUPLO_NAME: juplo
+ CONNECTION_JUPLO_CONN: zookeeper:2181
+ AUTO_CONNECT_CONNECTION_ID: JUPLO
+ depends_on:
+ - zookeeper
+
akhq:
image: tchiotludo/akhq:0.23.0
ports:
docker-kafka-server:
properties:
bootstrap.servers: "kafka:9092"
+ schema-registry:
+ url: "http://schema-registry:8085"
+ connect:
+ - name: "connect"
+ url: "http://connect:8083"
depends_on:
- kafka-1
- kafka-2
- kafka-3
- cli:
- image: juplo/toolbox
- command: sleep infinity
- depends_on:
- - setup
-
producer:
- image: juplo/supersimple-producer:1.0-SNAPSHOT
+ image: juplo/spring-producer:1.0-SNAPSHOT
environment:
- spring.kafka.bootstrap-servers: kafka:9092
- spring.kafka.client-id: producer
+ juplo.bootstrap-server: kafka:9092
+ juplo.client-id: producer
+ juplo.producer.topic: test
+ juplo.producer.linger-ms: 666
+ juplo.producer.throttle-ms: 100
consumer:
- image: juplo/spring-consumer-kafkalistener:1.0-SNAPSHOT
+ image: juplo/spring-consumer:1.1-kafkalistener-SNAPSHOT
environment:
spring.kafka.bootstrap-servers: kafka:9092
spring.kafka.client-id: consumer
spring.kafka.consumer.auto-offset-reset: earliest
logging.level.org.apache.kafka.clients.consumer: INFO
+ juplo.consumer.topic: test
+
+volumes:
+ zookeeper-data:
+ zookeeper-log:
+ kafka-1-data:
+ kafka-2-data:
+ kafka-3-data:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
- <version>2.7.2</version>
+ <version>3.3.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<artifactId>spring-consumer-kafkalistener</artifactId>
<name>Spring Consumer</name>
<description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
- <version>1.0-SNAPSHOT</version>
+ <version>1.1-kafkalistener-SNAPSHOT</version>
<properties>
- <java.version>17</java.version>
+ <java.version>21</java.version>
</properties>
<dependencies>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-validation</artifactId>
- </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-validation</artifactId>
+ </dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>pl.project13.maven</groupId>
+ <artifactId>git-commit-id-plugin</artifactId>
+ </plugin>
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
- <version>0.33.0</version>
+ <version>0.45.0</version>
<configuration>
<images>
<image>
</execution>
</executions>
</plugin>
- <plugin>
- <artifactId>maven-failsafe-plugin</artifactId>
- </plugin>
</plugins>
</build>
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.web.client.TestRestTemplate;
-import org.springframework.boot.test.web.server.LocalServerPort;
import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.test.web.servlet.MockMvc;
+import java.time.Duration;
+
+import static de.juplo.kafka.ApplicationTests.PARTITIONS;
import static de.juplo.kafka.ApplicationTests.TOPIC;
+import static org.awaitility.Awaitility.await;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@SpringBootTest(
- webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
properties = {
"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
- "simple.consumer.topic=" + TOPIC })
-@EmbeddedKafka(topics = TOPIC)
+ "spring.kafka.consumer.auto-offset-reset=earliest",
+ "juplo.consumer.topic=" + TOPIC })
+@AutoConfigureMockMvc
+@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
public class ApplicationTests
{
- public static final String TOPIC = "FOO";
-
- @LocalServerPort
- private int port;
+ static final String TOPIC = "FOO";
+ static final int PARTITIONS = 10;
@Autowired
- private TestRestTemplate restTemplate;
+ MockMvc mockMvc;
@Test
public void testApplicationStartup()
{
- restTemplate.getForObject(
- "http://localhost:" + port + "/actuator/health",
- String.class
- )
- .contains("UP");
+ await("Application is healthy")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> mockMvc
+ .perform(get("/actuator/health"))
+ .andExpect(status().isOk())
+ .andExpect(jsonPath("status").value("UP")));
}
}