From 52bb266bf7dc00a4f5b7a21173412544be35e0ea Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 14 Nov 2024 20:52:03 +0100 Subject: [PATCH] =?utf8?q?Implementierung=20&=20Setup=20an=20die=20=C3=BCb?= =?utf8?q?erarbeitete=20Consumer-=C3=9Cbung=20angepasst?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .editorconfig | 4 +- Dockerfile | 2 +- README.sh | 24 +- docker/docker-compose.yml | 221 +++++++++++++----- pom.xml | 23 +- .../de/juplo/kafka/ApplicationProperties.java | 22 +- .../java/de/juplo/kafka/ApplicationTests.java | 37 +-- 7 files changed, 232 insertions(+), 101 deletions(-) diff --git a/.editorconfig b/.editorconfig index 633c98a..c71516c 100644 --- a/.editorconfig +++ b/.editorconfig @@ -7,7 +7,7 @@ tab_width = 2 charset = utf-8 end_of_line = lf trim_trailing_whitespace = true -insert_final_newline = false +insert_final_newline = true [*.properties] -charset = latin1 \ No newline at end of file +charset = latin1 diff --git a/Dockerfile b/Dockerfile index ae52522..9e196ff 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM openjdk:17-jdk-slim +FROM eclipse-temurin:21-jre VOLUME /tmp COPY target/*.jar /opt/app.jar ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ] diff --git a/README.sh b/README.sh index 2bb7508..6b1d575 100755 --- a/README.sh +++ b/README.sh @@ -1,16 +1,16 @@ #!/bin/bash -IMAGE=juplo/spring-consumer-kafkalistener:1.0-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-kafkalistener-SNAPSHOT if [ "$1" = "cleanup" ] then - docker-compose -f docker/docker-compose.yml down -t0 -v --remove-orphans + docker compose -f docker/docker-compose.yml down -t0 -v --remove-orphans mvn clean exit fi -docker-compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 -docker-compose -f docker/docker-compose.yml rm -svf consumer +docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 +docker compose -f docker/docker-compose.yml rm -svf consumer if [[ $(docker image ls -q $IMAGE) == "" || @@ -23,11 +23,17 @@ else docker image ls $IMAGE fi -echo "Waiting for the Kafka-Cluster to become ready..." -docker-compose -f docker/docker-compose.yml run --rm cli cub kafka-ready -b kafka:9092 3 60 > /dev/null 2>&1 || exit 1 +docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 -docker-compose -f docker/docker-compose.yml up -t0 -d cli -docker-compose -f docker/docker-compose.yml up -d producer consumer +docker compose -f docker/docker-compose.yml up -d producer +docker compose -f docker/docker-compose.yml up -d consumer + +sleep 5 +docker compose -f docker/docker-compose.yml stop consumer + +docker compose -f docker/docker-compose.yml start consumer sleep 5 -docker-compose -f docker/docker-compose.yml logs consumer + +docker compose -f docker/docker-compose.yml stop producer consumer +docker compose -f docker/docker-compose.yml logs consumer diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index c65d152..5e5c055 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -1,39 +1,48 @@ -version: '3.2' services: + zookeeper: + image: confluentinc/cp-zookeeper:7.7.1 + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ports: + - 2181:2181 + volumes: + - zookeeper-data:/var/lib/zookeeper/data + - zookeeper-log:/var/lib/zookeeper/log kafka-1: - image: bitnami/kafka:3.4 + image: confluentinc/cp-kafka:7.7.1 environment: - KAFKA_ENABLE_KRAFT: 'yes' - KAFKA_KRAFT_CLUSTER_ID: r7dMBY60T16TrNCGeXniLw - KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER - KAFKA_CFG_LISTENERS: CONTROLLER://:9093, BROKER://:9092, LOCALHOST://:9081 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT, CONTROLLER:PLAINTEXT, LOCALHOST:PLAINTEXT - KAFKA_CFG_ADVERTISED_LISTENERS: BROKER://kafka-1:9092, LOCALHOST://localhost:9081 - KAFKA_CFG_NODE_ID: 1 - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093, 2@kafka-2:9093, 3@kafka-3:9093 - ALLOW_PLAINTEXT_LISTENER: 'yes' - KAFKA_CFG_INTER_BROKER_LISTENER_NAME: BROKER - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "false" + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENERS: BROKER://:9092, LOCALHOST://:9081 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT, LOCALHOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: BROKER://kafka-1:9092, LOCALHOST://localhost:9081 + KAFKA_BROKER_ID: 1 + KAFKA_INTER_BROKER_LISTENER_NAME: BROKER + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" + KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 1000 + volumes: + - kafka-1-data:/var/lib/kafka/data ports: - 9081:9081 + stop_grace_period: 120s + depends_on: + - zookeeper kafka-2: - image: bitnami/kafka:3.4 + image: confluentinc/cp-kafka:7.7.1 environment: - KAFKA_ENABLE_KRAFT: 'yes' - KAFKA_KRAFT_CLUSTER_ID: r7dMBY60T16TrNCGeXniLw - KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER - KAFKA_CFG_LISTENERS: CONTROLLER://:9093, BROKER://:9092, LOCALHOST://:9082 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT, CONTROLLER:PLAINTEXT, LOCALHOST:PLAINTEXT - KAFKA_CFG_ADVERTISED_LISTENERS: BROKER://kafka-2:9092, LOCALHOST://localhost:9082 - KAFKA_CFG_NODE_ID: 2 - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093, 2@kafka-2:9093, 3@kafka-3:9093 - ALLOW_PLAINTEXT_LISTENER: 'yes' - KAFKA_CFG_INTER_BROKER_LISTENER_NAME: BROKER - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "false" + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENERS: BROKER://:9092, LOCALHOST://:9082 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT, LOCALHOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: BROKER://kafka-2:9092, LOCALHOST://localhost:9082 + KAFKA_BROKER_ID: 2 + KAFKA_INTER_BROKER_LISTENER_NAME: BROKER + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" + KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 10000 + volumes: + - kafka-2-data:/var/lib/kafka/data ports: - 9092:9082 - 9082:9082 @@ -41,42 +50,124 @@ services: default: aliases: - kafka + stop_grace_period: 120s + depends_on: + - zookeeper kafka-3: - image: bitnami/kafka:3.4 + image: confluentinc/cp-kafka:7.7.1 environment: - KAFKA_ENABLE_KRAFT: 'yes' - KAFKA_KRAFT_CLUSTER_ID: r7dMBY60T16TrNCGeXniLw - KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER - KAFKA_CFG_LISTENERS: CONTROLLER://:9093, BROKER://:9092, LOCALHOST://:9083 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT, CONTROLLER:PLAINTEXT, LOCALHOST:PLAINTEXT - KAFKA_CFG_ADVERTISED_LISTENERS: BROKER://kafka-3:9092, LOCALHOST://localhost:9083 - KAFKA_CFG_NODE_ID: 3 - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093, 2@kafka-2:9093, 3@kafka-3:9093 - ALLOW_PLAINTEXT_LISTENER: 'yes' - KAFKA_CFG_INTER_BROKER_LISTENER_NAME: BROKER - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "false" + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENERS: BROKER://:9092, LOCALHOST://:9083 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT, LOCALHOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: BROKER://kafka-3:9092, LOCALHOST://localhost:9083 + KAFKA_BROKER_ID: 3 + KAFKA_INTER_BROKER_LISTENER_NAME: BROKER + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" + KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 10000 + volumes: + - kafka-3-data:/var/lib/kafka/data ports: - 9083:9083 + stop_grace_period: 120s + depends_on: + - zookeeper + schema-registry: + image: confluentinc/cp-schema-registry:7.7.1 + environment: + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092 + SCHEMA_REGISTRY_HOST_NAME: schema-registry + SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8085 + SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO + ports: + - 8085:8085 + depends_on: + - kafka-1 + - kafka-2 + - kafka-3 - setup: + connect: + image: confluentinc/cp-kafka-connect:7.7.1 + environment: + CONNECT_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092 + CONNECT_REST_PORT: 8083 + CONNECT_REST_LISTENERS: http://0.0.0.0:8083 + CONNECT_REST_ADVERTISED_HOST_NAME: connect + CONNECT_CONFIG_STORAGE_TOPIC: __connect-config + CONNECT_OFFSET_STORAGE_TOPIC: __connect-offsets + CONNECT_STATUS_STORAGE_TOPIC: __connect-status + CONNECT_GROUP_ID: kafka-connect + CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "true" + CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter + CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8085 + CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true" + CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter + CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8085 + CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_PLUGIN_PATH: /usr/share/java/ + ports: + - 8083:8083 + depends_on: + - schema-registry + + cli: image: juplo/toolbox - command: > - bash -c " - kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test - kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 --replication-factor 3 --config min.insync.replicas=2 - echo Das Topic \'test\' wurde erfolgreich angelegt: - kafka-topics --bootstrap-server kafka:9092 --describe --topic test - echo \'docker-compose restart -t0 setup\' löscht das Topic und legt es neu an - sleep infinity - " + command: sleep infinity + stop_grace_period: 0s depends_on: - kafka-1 - kafka-2 - kafka-3 + setup: + image: juplo/toolbox + command: + - bash + - -c + - | + cub kafka-ready -b kafka-1:9092,kafka-2:9092,kafka-3:9092 3 60 > /dev/null 2>&1 || exit 1 + if [ -e INITIALIZED ] + then + echo -n Bereits konfiguriert: + cat INITIALIZED + kafka-topics --bootstrap-server kafka:9092 --describe --topic test + else + kafka-topics --bootstrap-server kafka:9092 \ + --delete \ + --if-exists \ + --topic test + kafka-topics --bootstrap-server kafka:9092 \ + --create \ + --topic test \ + --partitions 2 \ + --replication-factor 3 \ + --config min.insync.replicas=2 \ + && echo Das Topic \'test\' wurde erfolgreich angelegt: \ + && kafka-topics --bootstrap-server kafka:9092 --describe --topic test \ + && date > INITIALIZED + fi + stop_grace_period: 0s + depends_on: + - cli + + zoonavigator: + image: elkozmon/zoonavigator:1.1.2 + ports: + - "8000:80" + environment: + HTTP_PORT: 80 + CONNECTION_JUPLO_NAME: juplo + CONNECTION_JUPLO_CONN: zookeeper:2181 + AUTO_CONNECT_CONNECTION_ID: JUPLO + depends_on: + - zookeeper + akhq: image: tchiotludo/akhq:0.23.0 ports: @@ -88,27 +179,37 @@ services: docker-kafka-server: properties: bootstrap.servers: "kafka:9092" + schema-registry: + url: "http://schema-registry:8085" + connect: + - name: "connect" + url: "http://connect:8083" depends_on: - kafka-1 - kafka-2 - kafka-3 - cli: - image: juplo/toolbox - command: sleep infinity - depends_on: - - setup - producer: - image: juplo/supersimple-producer:1.0-SNAPSHOT + image: juplo/spring-producer:1.0-SNAPSHOT environment: - spring.kafka.bootstrap-servers: kafka:9092 - spring.kafka.client-id: producer + juplo.bootstrap-server: kafka:9092 + juplo.client-id: producer + juplo.producer.topic: test + juplo.producer.linger-ms: 666 + juplo.producer.throttle-ms: 100 consumer: - image: juplo/spring-consumer-kafkalistener:1.0-SNAPSHOT + image: juplo/spring-consumer:1.1-kafkalistener-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: consumer spring.kafka.consumer.auto-offset-reset: earliest logging.level.org.apache.kafka.clients.consumer: INFO + juplo.consumer.topic: test + +volumes: + zookeeper-data: + zookeeper-log: + kafka-1-data: + kafka-2-data: + kafka-3-data: diff --git a/pom.xml b/pom.xml index 6f88590..122e62f 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ org.springframework.boot spring-boot-starter-parent - 2.7.2 + 3.3.4 @@ -15,10 +15,10 @@ spring-consumer-kafkalistener Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.0-SNAPSHOT + 1.1-kafkalistener-SNAPSHOT - 17 + 21 @@ -26,10 +26,6 @@ org.springframework.boot spring-boot-starter-web - - org.springframework.boot - spring-boot-starter-validation - org.springframework.boot spring-boot-starter-actuator @@ -39,6 +35,10 @@ spring-boot-configuration-processor true + + org.springframework.boot + spring-boot-starter-validation + org.springframework.kafka spring-kafka @@ -72,10 +72,14 @@ + + pl.project13.maven + git-commit-id-plugin + io.fabric8 docker-maven-plugin - 0.33.0 + 0.45.0 @@ -93,9 +97,6 @@ - - maven-failsafe-plugin - diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index a4cc8b8..b8caf5c 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -9,13 +9,29 @@ import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; -@ConfigurationProperties(prefix = "simple.consumer") +@ConfigurationProperties(prefix = "juplo") @Validated @Getter @Setter public class ApplicationProperties { @NotNull - @NotEmpty - private String topic; + private ConsumerProperties consumer; + + + public ConsumerProperties getConsumerProperties() + { + return consumer; + } + + + @Validated + @Getter + @Setter + static class ConsumerProperties + { + @NotNull + @NotEmpty + private String topic; + } } diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 9344bcf..e4b97a4 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -2,39 +2,46 @@ package de.juplo.kafka; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.web.client.TestRestTemplate; -import org.springframework.boot.test.web.server.LocalServerPort; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.web.servlet.MockMvc; +import java.time.Duration; + +import static de.juplo.kafka.ApplicationTests.PARTITIONS; import static de.juplo.kafka.ApplicationTests.TOPIC; +import static org.awaitility.Awaitility.await; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @SpringBootTest( - webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", - "simple.consumer.topic=" + TOPIC }) -@EmbeddedKafka(topics = TOPIC) + "spring.kafka.consumer.auto-offset-reset=earliest", + "juplo.consumer.topic=" + TOPIC }) +@AutoConfigureMockMvc +@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) public class ApplicationTests { - public static final String TOPIC = "FOO"; - - @LocalServerPort - private int port; + static final String TOPIC = "FOO"; + static final int PARTITIONS = 10; @Autowired - private TestRestTemplate restTemplate; + MockMvc mockMvc; @Test public void testApplicationStartup() { - restTemplate.getForObject( - "http://localhost:" + port + "/actuator/health", - String.class - ) - .contains("UP"); + await("Application is healthy") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> mockMvc + .perform(get("/actuator/health")) + .andExpect(status().isOk()) + .andExpect(jsonPath("status").value("UP"))); } } -- 2.20.1