Springify: Konfiguration erfolgt über `KafkaProperties`
authorKai Moritz <kai@juplo.de>
Fri, 22 Apr 2022 09:08:37 +0000 (11:08 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 10 Sep 2022 07:47:58 +0000 (09:47 +0200)
* Per `git cherry-pick` aus `springified-consumer--config' übernommen.
* Conflicts:
** src/main/java/de/juplo/kafka/ApplicationConfiguration.java
** src/main/java/de/juplo/kafka/ApplicationProperties.java
** src/main/resources/application.yml
** src/test/java/de/juplo/kafka/ApplicationTests.java
* Anpassungen an `README.sh`, `docker-compose.yml` und `pom.xml` nachgeholt.

README.sh
docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ApplicationIT.java
src/test/java/de/juplo/kafka/GenericApplicationTests.java

index 22f52f0..07e36d7 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-IMAGE=juplo/sumup-adder-json:1.0-SNAPSHOT
+IMAGE=juplo/sumup-adder-springified:1.0-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
index 5d33cd1..16fec5b 100644 (file)
@@ -96,13 +96,13 @@ services:
     command: sleep infinity
 
   gateway:
-    image: juplo/sumup-gateway:1.0-SNAPSHOT
+    image: juplo/sumup-gateway--springified:1.0-SNAPSHOT
     ports:
       - 8080:8080
     environment:
       server.port: 8080
-      sumup.gateway.bootstrap-server: kafka:9092
-      sumup.gateway.client-id: gateway
+      spring.kafka.bootstrap-servers: kafka:9092
+      spring.kafka.client-id: gateway
       sumup.gateway.topic: in
 
   requests-1:
@@ -124,28 +124,28 @@ services:
       sumup.requests.client-id: requests-2
 
   adder-1:
-    image: juplo/sumup-adder-json:1.0-SNAPSHOT
+    image: juplo/sumup-adder-springified:1.0-SNAPSHOT
     ports:
       - 8091:8080
     environment:
       server.port: 8080
-      sumup.adder.bootstrap-server: kafka:9092
-      sumup.adder.client-id: adder-1
-      sumup.adder.commit-interval: 1s
+      spring.kafka.bootstrap-servers: kafka:9092
+      spring.kafak.client-id: adder-1
+      spring.kafka.auto-commit-interval: 1s
       sumup.adder.throttle: 3ms
       spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
       spring.data.mongodb.database: juplo
       logging.level.org.apache.kafka.clients.consumer: DEBUG
 
   adder-2:
-    image: juplo/sumup-adder-json:1.0-SNAPSHOT
+    image: juplo/sumup-adder-springified:1.0-SNAPSHOT
     ports:
       - 8092:8080
     environment:
       server.port: 8080
-      sumup.adder.bootstrap-server: kafka:9092
-      sumup.adder.client-id: adder-2
-      sumup.adder.commit-interval: 1s
+      spring.kafka.bootstrap-servers: kafka:9092
+      spring.kafak.client-id: adder-2
+      spring.kafka.auto-commit-interval: 1s
       sumup.adder.throttle: 3ms
       spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
       spring.data.mongodb.database: juplo
diff --git a/pom.xml b/pom.xml
index 17d3cba..a252d1c 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -12,7 +12,7 @@
   </parent>
 
   <groupId>de.juplo.kafka</groupId>
-  <artifactId>sumup-adder-json</artifactId>
+  <artifactId>sumup-adder-springified</artifactId>
   <version>1.0-SNAPSHOT</version>
   <name>SumUp Adder</name>
   <description>Calculates the sum for the send messages. This version consumes JSON-messages.</description>
index 156b5a0..523707f 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka;
 
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -14,18 +15,19 @@ import java.util.concurrent.Executors;
 
 
 @Configuration
-@EnableConfigurationProperties(ApplicationProperties.class)
+@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
 public class ApplicationConfiguration
 {
   @Bean
   public ApplicationRecordHandler recordHandler(
       AdderResults adderResults,
-      ApplicationProperties properties)
+      KafkaProperties kafkaProperties,
+      ApplicationProperties applicationProperties)
   {
     return new ApplicationRecordHandler(
         adderResults,
-        Optional.ofNullable(properties.getThrottle()),
-        properties.getClientId());
+        Optional.ofNullable(applicationProperties.getThrottle()),
+        kafkaProperties.getClientId());
   }
 
   @Bean
@@ -39,13 +41,14 @@ public class ApplicationConfiguration
       ApplicationRecordHandler recordHandler,
       AdderResults adderResults,
       StateRepository stateRepository,
-      ApplicationProperties properties)
+      KafkaProperties kafkaProperties,
+      ApplicationProperties applicationProperties)
   {
     return new ApplicationRebalanceListener(
         recordHandler,
         adderResults,
         stateRepository,
-        properties.getClientId());
+        kafkaProperties.getClientId());
   }
 
   @Bean
@@ -54,13 +57,14 @@ public class ApplicationConfiguration
       ExecutorService executor,
       ApplicationRebalanceListener rebalanceListener,
       ApplicationRecordHandler recordHandler,
-      ApplicationProperties properties)
+      KafkaProperties kafkaProperties,
+      ApplicationProperties applicationProperties)
   {
     return
         new EndlessConsumer<>(
             executor,
-            properties.getClientId(),
-            properties.getTopic(),
+            kafkaProperties.getClientId(),
+            applicationProperties.getTopic(),
             kafkaConsumer,
             rebalanceListener,
             recordHandler);
@@ -73,17 +77,17 @@ public class ApplicationConfiguration
   }
 
   @Bean(destroyMethod = "close")
-  public KafkaConsumer<String, Message> kafkaConsumer(ApplicationProperties properties)
+  public KafkaConsumer<String, Message> kafkaConsumer(KafkaProperties kafkaProperties)
   {
     Properties props = new Properties();
 
-    props.put("bootstrap.servers", properties.getBootstrapServer());
+    props.put("bootstrap.servers", kafkaProperties.getBootstrapServers());
     props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.StickyAssignor");
-    props.put("group.id", properties.getGroupId());
-    props.put("client.id", properties.getClientId());
-    props.put("auto.offset.reset", properties.getAutoOffsetReset());
-    props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis());
-    props.put("metadata.max.age.ms", "1000");
+    props.put("group.id", kafkaProperties.getConsumer().getGroupId());
+    props.put("client.id", kafkaProperties.getClientId());
+    props.put("auto.offset.reset", kafkaProperties.getConsumer().getAutoOffsetReset());
+    props.put("auto.commit.interval.ms", (int)kafkaProperties.getConsumer().getAutoCommitInterval().toMillis());
+    props.put("metadata.max.age.ms", kafkaProperties.getConsumer().getProperties().get("metadata.max.age.ms"));
     props.put("key.deserializer", StringDeserializer.class.getName());
     props.put("value.deserializer", JsonDeserializer.class.getName());
     props.put(JsonDeserializer.TRUSTED_PACKAGES, "de.juplo.kafka");
index f852c00..005460c 100644 (file)
@@ -16,22 +16,8 @@ import java.time.Duration;
 @Setter
 public class ApplicationProperties
 {
-  @NotNull
-  @NotEmpty
-  private String bootstrapServer;
-  @NotNull
-  @NotEmpty
-  private String groupId;
-  @NotNull
-  @NotEmpty
-  private String clientId;
   @NotNull
   @NotEmpty
   private String topic;
-  @NotNull
-  @NotEmpty
-  private String autoOffsetReset;
-  @NotNull
-  private Duration commitInterval;
   private Duration throttle;
 }
index 26948f5..a899340 100644 (file)
@@ -1,11 +1,6 @@
 sumup:
   adder:
-    bootstrap-server: :9092
-    group-id: my-group
-    client-id: DEV
     topic: out
-    auto-offset-reset: earliest
-    commit-interval: 5s
 management:
   endpoint:
     shutdown:
@@ -21,16 +16,25 @@ management:
       enabled: true
 info:
   kafka:
-    bootstrap-server: ${consumer.bootstrap-server}
-    client-id: ${consumer.client-id}
-    group-id: ${consumer.group-id}
+    bootstrap-server: ${spring.kafka.consumer.bootstrap-servers}
+    client-id: ${spring.kafka.consumer.client-id}
+    group-id: ${spring.kafka.consumer.group-id}
     topic: ${consumer.topic}
-    auto-offset-reset: ${consumer.auto-offset-reset}
+    auto-offset-reset: ${spring.kafka.consumer.auto-offset-reset}
 spring:
   data:
     mongodb:
       uri: mongodb://juplo:training@localhost:27017
       database: juplo
+  kafka:
+    bootstrap-servers: :9092
+    client-id: DEV
+    consumer:
+      group-id: my-group
+      auto-offset-reset: earliest
+      auto-commit-interval: 5s
+      properties:
+        metadata.max.age.ms: 1000
 logging:
   level:
     root: INFO
index dcac79b..4bb4f5b 100644 (file)
@@ -14,7 +14,7 @@ import static de.juplo.kafka.ApplicationIT.TOPIC;
 @SpringBootTest(
     webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
     properties = {
-        "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}",
+        "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
         "sumup.adder.topic=" + TOPIC,
         "spring.mongodb.embedded.version=4.4.13" })
 @EmbeddedKafka(topics = TOPIC)
index 8849317..869b5d9 100644 (file)
@@ -14,6 +14,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.junit.jupiter.api.*;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
 import org.springframework.boot.autoconfigure.mongo.MongoProperties;
 import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
@@ -40,9 +41,9 @@ import static org.awaitility.Awaitility.*;
 @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
 @TestPropertySource(
                properties = {
-                               "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}",
+                               "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
                                "sumup.adder.topic=" + TOPIC,
-                               "sumup.adder.commit-interval=500ms",
+                               "spring.kafka.consumer.auto-commit-interval=500ms",
                                "spring.mongodb.embedded.version=4.4.13" })
 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
 @EnableAutoConfiguration
@@ -59,7 +60,9 @@ abstract class GenericApplicationTests<K, V>
        @Autowired
        Consumer<ConsumerRecord<K, V>> consumer;
        @Autowired
-       ApplicationProperties properties;
+       ApplicationProperties applicationProperties;
+  @Autowired
+  KafkaProperties kafkaProperties;
        @Autowired
        ExecutorService executor;
        @Autowired
@@ -330,16 +333,16 @@ abstract class GenericApplicationTests<K, V>
        {
                Properties props;
                props = new Properties();
-               props.put("bootstrap.servers", properties.getBootstrapServer());
+               props.put("bootstrap.servers", kafkaProperties.getBootstrapServers());
                props.put("linger.ms", 100);
                props.put("key.serializer", BytesSerializer.class.getName());
                props.put("value.serializer", BytesSerializer.class.getName());
                testRecordProducer = new KafkaProducer<>(props);
 
                props = new Properties();
-               props.put("bootstrap.servers", properties.getBootstrapServer());
+               props.put("bootstrap.servers", kafkaProperties.getBootstrapServers());
                props.put("client.id", "OFFSET-CONSUMER");
-               props.put("group.id", properties.getGroupId());
+               props.put("group.id", kafkaProperties.getConsumer().getGroupId());
                props.put("key.deserializer", BytesDeserializer.class.getName());
                props.put("value.deserializer", BytesDeserializer.class.getName());
                offsetConsumer = new KafkaConsumer<>(props);
@@ -373,8 +376,8 @@ abstract class GenericApplicationTests<K, V>
                endlessConsumer =
                                new EndlessConsumer<>(
                                                executor,
-                                               properties.getClientId(),
-                                               properties.getTopic(),
+                                               kafkaProperties.getClientId(),
+                                               applicationProperties.getTopic(),
                                                kafkaConsumer,
                                                rebalanceListener,
                                                captureOffsetAndExecuteTestHandler);