Implementierung & Setup an die überarbeitete Consumer-Übung angepasst
authorKai Moritz <kai@juplo.de>
Thu, 14 Nov 2024 19:52:03 +0000 (20:52 +0100)
committerKai Moritz <kai@juplo.de>
Thu, 14 Nov 2024 20:24:51 +0000 (21:24 +0100)
.editorconfig
Dockerfile
README.sh
docker/docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/test/java/de/juplo/kafka/ApplicationTests.java

index 633c98a..c71516c 100644 (file)
@@ -7,7 +7,7 @@ tab_width = 2
 charset = utf-8
 end_of_line = lf
 trim_trailing_whitespace = true
-insert_final_newline = false
+insert_final_newline = true
 
 [*.properties]
-charset = latin1
\ No newline at end of file
+charset = latin1
index ae52522..9e196ff 100644 (file)
@@ -1,4 +1,4 @@
-FROM openjdk:17-jdk-slim
+FROM eclipse-temurin:21-jre
 VOLUME /tmp
 COPY target/*.jar /opt/app.jar
 ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ]
index 2bb7508..6b1d575 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -1,16 +1,16 @@
 #!/bin/bash
 
-IMAGE=juplo/spring-consumer-kafkalistener:1.0-SNAPSHOT
+IMAGE=juplo/spring-consumer:1.1-kafkalistener-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
-  docker-compose -f docker/docker-compose.yml down -t0 -v --remove-orphans
+  docker compose -f docker/docker-compose.yml down -t0 -v --remove-orphans
   mvn clean
   exit
 fi
 
-docker-compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3
-docker-compose -f docker/docker-compose.yml rm -svf consumer
+docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3
+docker compose -f docker/docker-compose.yml rm -svf consumer
 
 if [[
   $(docker image ls -q $IMAGE) == "" ||
@@ -23,11 +23,17 @@ else
   docker image ls $IMAGE
 fi
 
-echo "Waiting for the Kafka-Cluster to become ready..."
-docker-compose -f docker/docker-compose.yml run --rm cli cub kafka-ready -b kafka:9092 3 60 > /dev/null 2>&1 || exit 1
+docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1
 
-docker-compose -f docker/docker-compose.yml up -t0 -d cli
 
-docker-compose -f docker/docker-compose.yml up -d producer consumer
+docker compose -f docker/docker-compose.yml up -d producer
+docker compose -f docker/docker-compose.yml up -d consumer
+
+sleep 5
+docker compose -f docker/docker-compose.yml stop consumer
+
+docker compose -f docker/docker-compose.yml start consumer
 sleep 5
-docker-compose -f docker/docker-compose.yml logs consumer
+
+docker compose -f docker/docker-compose.yml stop producer consumer
+docker compose -f docker/docker-compose.yml logs consumer
index c65d152..5e5c055 100644 (file)
@@ -1,39 +1,48 @@
-version: '3.2'
 services:
+  zookeeper:
+    image: confluentinc/cp-zookeeper:7.7.1
+    environment:
+      ZOOKEEPER_CLIENT_PORT: 2181
+    ports:
+      - 2181:2181
+    volumes:
+      - zookeeper-data:/var/lib/zookeeper/data
+      - zookeeper-log:/var/lib/zookeeper/log
 
   kafka-1:
-    image: bitnami/kafka:3.4
+    image: confluentinc/cp-kafka:7.7.1
     environment:
-      KAFKA_ENABLE_KRAFT: 'yes'
-      KAFKA_KRAFT_CLUSTER_ID: r7dMBY60T16TrNCGeXniLw
-      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
-      KAFKA_CFG_LISTENERS: CONTROLLER://:9093, BROKER://:9092, LOCALHOST://:9081
-      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT, CONTROLLER:PLAINTEXT, LOCALHOST:PLAINTEXT
-      KAFKA_CFG_ADVERTISED_LISTENERS: BROKER://kafka-1:9092, LOCALHOST://localhost:9081
-      KAFKA_CFG_NODE_ID: 1
-      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093, 2@kafka-2:9093, 3@kafka-3:9093
-      ALLOW_PLAINTEXT_LISTENER: 'yes'
-      KAFKA_CFG_INTER_BROKER_LISTENER_NAME: BROKER
-      KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
-      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "false"
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_LISTENERS: BROKER://:9092, LOCALHOST://:9081
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT, LOCALHOST:PLAINTEXT
+      KAFKA_ADVERTISED_LISTENERS: BROKER://kafka-1:9092, LOCALHOST://localhost:9081
+      KAFKA_BROKER_ID: 1
+      KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+      KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 1000
+    volumes:
+      - kafka-1-data:/var/lib/kafka/data
     ports:
       - 9081:9081
+    stop_grace_period: 120s
+    depends_on:
+      - zookeeper
 
   kafka-2:
-    image: bitnami/kafka:3.4
+    image: confluentinc/cp-kafka:7.7.1
     environment:
-      KAFKA_ENABLE_KRAFT: 'yes'
-      KAFKA_KRAFT_CLUSTER_ID: r7dMBY60T16TrNCGeXniLw
-      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
-      KAFKA_CFG_LISTENERS: CONTROLLER://:9093, BROKER://:9092, LOCALHOST://:9082
-      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT, CONTROLLER:PLAINTEXT, LOCALHOST:PLAINTEXT
-      KAFKA_CFG_ADVERTISED_LISTENERS: BROKER://kafka-2:9092, LOCALHOST://localhost:9082
-      KAFKA_CFG_NODE_ID: 2
-      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093, 2@kafka-2:9093, 3@kafka-3:9093
-      ALLOW_PLAINTEXT_LISTENER: 'yes'
-      KAFKA_CFG_INTER_BROKER_LISTENER_NAME: BROKER
-      KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
-      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "false"
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_LISTENERS: BROKER://:9092, LOCALHOST://:9082
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT, LOCALHOST:PLAINTEXT
+      KAFKA_ADVERTISED_LISTENERS: BROKER://kafka-2:9092, LOCALHOST://localhost:9082
+      KAFKA_BROKER_ID: 2
+      KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+      KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 10000
+    volumes:
+      - kafka-2-data:/var/lib/kafka/data
     ports:
       - 9092:9082
       - 9082:9082
@@ -41,42 +50,124 @@ services:
       default:
         aliases:
           - kafka
+    stop_grace_period: 120s
+    depends_on:
+      - zookeeper
 
   kafka-3:
-    image: bitnami/kafka:3.4
+    image: confluentinc/cp-kafka:7.7.1
     environment:
-      KAFKA_ENABLE_KRAFT: 'yes'
-      KAFKA_KRAFT_CLUSTER_ID: r7dMBY60T16TrNCGeXniLw
-      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
-      KAFKA_CFG_LISTENERS: CONTROLLER://:9093, BROKER://:9092, LOCALHOST://:9083
-      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT, CONTROLLER:PLAINTEXT, LOCALHOST:PLAINTEXT
-      KAFKA_CFG_ADVERTISED_LISTENERS: BROKER://kafka-3:9092, LOCALHOST://localhost:9083
-      KAFKA_CFG_NODE_ID: 3
-      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093, 2@kafka-2:9093, 3@kafka-3:9093
-      ALLOW_PLAINTEXT_LISTENER: 'yes'
-      KAFKA_CFG_INTER_BROKER_LISTENER_NAME: BROKER
-      KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
-      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "false"
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_LISTENERS: BROKER://:9092, LOCALHOST://:9083
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT, LOCALHOST:PLAINTEXT
+      KAFKA_ADVERTISED_LISTENERS: BROKER://kafka-3:9092, LOCALHOST://localhost:9083
+      KAFKA_BROKER_ID: 3
+      KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+      KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 10000
+    volumes:
+      - kafka-3-data:/var/lib/kafka/data
     ports:
       - 9083:9083
+    stop_grace_period: 120s
+    depends_on:
+      - zookeeper
 
+  schema-registry:
+    image: confluentinc/cp-schema-registry:7.7.1
+    environment:
+      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
+      SCHEMA_REGISTRY_HOST_NAME: schema-registry
+      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8085
+      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
+    ports:
+      - 8085:8085
+    depends_on:
+      - kafka-1
+      - kafka-2
+      - kafka-3
 
-  setup:
+  connect:
+    image: confluentinc/cp-kafka-connect:7.7.1
+    environment:
+      CONNECT_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
+      CONNECT_REST_PORT: 8083
+      CONNECT_REST_LISTENERS: http://0.0.0.0:8083
+      CONNECT_REST_ADVERTISED_HOST_NAME: connect
+      CONNECT_CONFIG_STORAGE_TOPIC: __connect-config
+      CONNECT_OFFSET_STORAGE_TOPIC: __connect-offsets
+      CONNECT_STATUS_STORAGE_TOPIC: __connect-status
+      CONNECT_GROUP_ID: kafka-connect
+      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "true"
+      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
+      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8085
+      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true"
+      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
+      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8085
+      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
+      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
+      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
+      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
+      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
+      CONNECT_PLUGIN_PATH: /usr/share/java/
+    ports:
+      - 8083:8083
+    depends_on:
+      - schema-registry
+
+  cli:
     image: juplo/toolbox
-    command: >
-      bash -c "
-        kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
-        kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 --replication-factor 3 --config min.insync.replicas=2
-        echo Das Topic \'test\' wurde erfolgreich angelegt:
-        kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-        echo \'docker-compose restart -t0 setup\' löscht das Topic und legt es neu an
-        sleep infinity
-      "
+    command: sleep infinity
+    stop_grace_period: 0s
     depends_on:
       - kafka-1
       - kafka-2
       - kafka-3
 
+  setup:
+    image: juplo/toolbox
+    command:
+      - bash
+      - -c
+      - |
+        cub kafka-ready -b kafka-1:9092,kafka-2:9092,kafka-3:9092 3 60 > /dev/null 2>&1 || exit 1
+        if [ -e INITIALIZED ]
+        then
+          echo -n Bereits konfiguriert: 
+          cat INITIALIZED
+          kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+        else
+          kafka-topics --bootstrap-server kafka:9092 \
+                       --delete \
+                       --if-exists \
+                       --topic test
+          kafka-topics --bootstrap-server kafka:9092 \
+                       --create \
+                       --topic test \
+                       --partitions 2 \
+                       --replication-factor 3 \
+                       --config min.insync.replicas=2 \
+          && echo Das Topic \'test\' wurde erfolgreich angelegt: \
+          && kafka-topics --bootstrap-server kafka:9092 --describe --topic test \
+          && date > INITIALIZED
+        fi
+    stop_grace_period: 0s
+    depends_on:
+      - cli
+
+  zoonavigator:
+    image: elkozmon/zoonavigator:1.1.2
+    ports:
+      - "8000:80"
+    environment:
+      HTTP_PORT: 80
+      CONNECTION_JUPLO_NAME: juplo
+      CONNECTION_JUPLO_CONN: zookeeper:2181
+      AUTO_CONNECT_CONNECTION_ID: JUPLO
+    depends_on:
+      - zookeeper
+
   akhq:
     image: tchiotludo/akhq:0.23.0
     ports:
@@ -88,27 +179,37 @@ services:
             docker-kafka-server:
               properties:
                 bootstrap.servers: "kafka:9092"
+              schema-registry:
+                url: "http://schema-registry:8085"
+              connect:
+                - name: "connect"
+                  url: "http://connect:8083"
     depends_on:
       - kafka-1
       - kafka-2
       - kafka-3
 
-  cli:
-    image: juplo/toolbox
-    command: sleep infinity
-    depends_on:
-      - setup
-
   producer:
-    image: juplo/supersimple-producer:1.0-SNAPSHOT
+    image: juplo/spring-producer:1.0-SNAPSHOT
     environment:
-      spring.kafka.bootstrap-servers: kafka:9092
-      spring.kafka.client-id: producer
+      juplo.bootstrap-server: kafka:9092
+      juplo.client-id: producer
+      juplo.producer.topic: test
+      juplo.producer.linger-ms: 666
+      juplo.producer.throttle-ms: 100
 
   consumer:
-    image: juplo/spring-consumer-kafkalistener:1.0-SNAPSHOT
+    image: juplo/spring-consumer:1.1-kafkalistener-SNAPSHOT
     environment:
       spring.kafka.bootstrap-servers: kafka:9092
       spring.kafka.client-id: consumer
       spring.kafka.consumer.auto-offset-reset: earliest
       logging.level.org.apache.kafka.clients.consumer: INFO
+      juplo.consumer.topic: test
+
+volumes:
+  zookeeper-data:
+  zookeeper-log:
+  kafka-1-data:
+  kafka-2-data:
+  kafka-3-data:
diff --git a/pom.xml b/pom.xml
index 6f88590..122e62f 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,7 @@
   <parent>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-parent</artifactId>
-    <version>2.7.2</version>
+    <version>3.3.4</version>
     <relativePath/> <!-- lookup parent from repository -->
   </parent>
 
   <artifactId>spring-consumer-kafkalistener</artifactId>
   <name>Spring Consumer</name>
   <description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
-  <version>1.0-SNAPSHOT</version>
+  <version>1.1-kafkalistener-SNAPSHOT</version>
 
   <properties>
-    <java.version>17</java.version>
+    <java.version>21</java.version>
   </properties>
 
   <dependencies>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.springframework.boot</groupId>
-      <artifactId>spring-boot-starter-validation</artifactId>
-    </dependency>
     <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-actuator</artifactId>
       <artifactId>spring-boot-configuration-processor</artifactId>
       <optional>true</optional>
     </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-validation</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.springframework.kafka</groupId>
       <artifactId>spring-kafka</artifactId>
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>pl.project13.maven</groupId>
+        <artifactId>git-commit-id-plugin</artifactId>
+      </plugin>
       <plugin>
         <groupId>io.fabric8</groupId>
         <artifactId>docker-maven-plugin</artifactId>
-        <version>0.33.0</version>
+        <version>0.45.0</version>
         <configuration>
           <images>
             <image>
@@ -93,9 +97,6 @@
           </execution>
         </executions>
       </plugin>
-      <plugin>
-        <artifactId>maven-failsafe-plugin</artifactId>
-      </plugin>
     </plugins>
   </build>
 
index a4cc8b8..b8caf5c 100644 (file)
@@ -9,13 +9,29 @@ import javax.validation.constraints.NotEmpty;
 import javax.validation.constraints.NotNull;
 
 
-@ConfigurationProperties(prefix = "simple.consumer")
+@ConfigurationProperties(prefix = "juplo")
 @Validated
 @Getter
 @Setter
 public class ApplicationProperties
 {
   @NotNull
-  @NotEmpty
-  private String topic;
+  private ConsumerProperties consumer;
+
+
+  public ConsumerProperties getConsumerProperties()
+  {
+    return consumer;
+  }
+
+
+  @Validated
+  @Getter
+  @Setter
+  static class ConsumerProperties
+  {
+    @NotNull
+    @NotEmpty
+    private String topic;
+  }
 }
index 9344bcf..e4b97a4 100644 (file)
@@ -2,39 +2,46 @@ package de.juplo.kafka;
 
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
 import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.web.client.TestRestTemplate;
-import org.springframework.boot.test.web.server.LocalServerPort;
 import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.test.web.servlet.MockMvc;
 
+import java.time.Duration;
+
+import static de.juplo.kafka.ApplicationTests.PARTITIONS;
 import static de.juplo.kafka.ApplicationTests.TOPIC;
+import static org.awaitility.Awaitility.await;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
 
 
 @SpringBootTest(
-    webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
     properties = {
         "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
-        "simple.consumer.topic=" + TOPIC })
-@EmbeddedKafka(topics = TOPIC)
+        "spring.kafka.consumer.auto-offset-reset=earliest",
+        "juplo.consumer.topic=" + TOPIC })
+@AutoConfigureMockMvc
+@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
 public class ApplicationTests
 {
-  public static final String TOPIC = "FOO";
-
-  @LocalServerPort
-  private int port;
+  static final String TOPIC = "FOO";
+  static final int PARTITIONS = 10;
 
   @Autowired
-  private TestRestTemplate restTemplate;
+  MockMvc mockMvc;
 
 
 
   @Test
   public void testApplicationStartup()
   {
-    restTemplate.getForObject(
-        "http://localhost:" + port + "/actuator/health",
-        String.class
-        )
-        .contains("UP");
+    await("Application is healthy")
+      .atMost(Duration.ofSeconds(5))
+      .untilAsserted(() -> mockMvc
+        .perform(get("/actuator/health"))
+        .andExpect(status().isOk())
+        .andExpect(jsonPath("status").value("UP")));
   }
 }