Springify: Konfiguration erfolgt über `KafkaProperties`
authorKai Moritz <kai@juplo.de>
Fri, 22 Apr 2022 09:08:37 +0000 (11:08 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 4 Sep 2022 15:51:40 +0000 (17:51 +0200)
* Per `git cherry-pick` aus `springified-consumer--config' übernommen.
* Conflicts:
** src/main/java/de/juplo/kafka/ApplicationConfiguration.java
** src/main/java/de/juplo/kafka/ApplicationProperties.java
** src/main/resources/application.yml
** src/test/java/de/juplo/kafka/ApplicationTests.java

src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ApplicationIT.java
src/test/java/de/juplo/kafka/GenericApplicationTests.java

index 596be26..92c7abe 100644 (file)
@@ -3,6 +3,7 @@ package de.juplo.kafka;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -16,18 +17,19 @@ import java.util.concurrent.Executors;
 
 
 @Configuration
-@EnableConfigurationProperties(ApplicationProperties.class)
+@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
 public class ApplicationConfiguration
 {
   @Bean
   public ApplicationRecordHandler recordHandler(
       AdderResults adderResults,
-      ApplicationProperties properties)
+      KafkaProperties kafkaProperties,
+      ApplicationProperties applicationProperties)
   {
     return new ApplicationRecordHandler(
         adderResults,
-        Optional.ofNullable(properties.getThrottle()),
-        properties.getClientId());
+        Optional.ofNullable(applicationProperties.getThrottle()),
+        kafkaProperties.getClientId());
   }
 
   @Bean
@@ -42,16 +44,17 @@ public class ApplicationConfiguration
       AdderResults adderResults,
       StateRepository stateRepository,
       Consumer<String, Message> consumer,
-      ApplicationProperties properties)
+      KafkaProperties kafkaProperties,
+      ApplicationProperties applicationProperties)
   {
     return new ApplicationRebalanceListener(
         recordHandler,
         adderResults,
         stateRepository,
-        properties.getClientId(),
-        properties.getTopic(),
+        kafkaProperties.getClientId(),
+        applicationProperties.getTopic(),
         Clock.systemDefaultZone(),
-        properties.getCommitInterval(),
+        kafkaProperties.getConsumer().getAutoCommitInterval(),
         consumer);
   }
 
@@ -61,13 +64,14 @@ public class ApplicationConfiguration
       ExecutorService executor,
       ApplicationRebalanceListener rebalanceListener,
       ApplicationRecordHandler recordHandler,
-      ApplicationProperties properties)
+      KafkaProperties kafkaProperties,
+      ApplicationProperties applicationProperties)
   {
     return
         new EndlessConsumer<>(
             executor,
-            properties.getClientId(),
-            properties.getTopic(),
+            kafkaProperties.getClientId(),
+            applicationProperties.getTopic(),
             kafkaConsumer,
             rebalanceListener,
             recordHandler);
@@ -80,17 +84,17 @@ public class ApplicationConfiguration
   }
 
   @Bean(destroyMethod = "close")
-  public KafkaConsumer<String, Message> kafkaConsumer(ApplicationProperties properties)
+  public KafkaConsumer<String, Message> kafkaConsumer(KafkaProperties kafkaProperties)
   {
     Properties props = new Properties();
 
-    props.put("bootstrap.servers", properties.getBootstrapServer());
+    props.put("bootstrap.servers", kafkaProperties.getBootstrapServers());
     props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
-    props.put("group.id", properties.getGroupId());
-    props.put("client.id", properties.getClientId());
+    props.put("group.id", kafkaProperties.getConsumer().getGroupId());
+    props.put("client.id", kafkaProperties.getClientId());
     props.put("enable.auto.commit", false);
-    props.put("auto.offset.reset", properties.getAutoOffsetReset());
-    props.put("metadata.max.age.ms", "1000");
+    props.put("auto.offset.reset", kafkaProperties.getConsumer().getAutoOffsetReset());
+    props.put("metadata.max.age.ms", kafkaProperties.getConsumer().getProperties().get("metadata.max.age.ms"));
     props.put("key.deserializer", StringDeserializer.class.getName());
     props.put("value.deserializer", JsonDeserializer.class.getName());
     props.put(JsonDeserializer.TRUSTED_PACKAGES, "de.juplo.kafka");
index f852c00..005460c 100644 (file)
@@ -16,22 +16,8 @@ import java.time.Duration;
 @Setter
 public class ApplicationProperties
 {
-  @NotNull
-  @NotEmpty
-  private String bootstrapServer;
-  @NotNull
-  @NotEmpty
-  private String groupId;
-  @NotNull
-  @NotEmpty
-  private String clientId;
   @NotNull
   @NotEmpty
   private String topic;
-  @NotNull
-  @NotEmpty
-  private String autoOffsetReset;
-  @NotNull
-  private Duration commitInterval;
   private Duration throttle;
 }
index 26948f5..a899340 100644 (file)
@@ -1,11 +1,6 @@
 sumup:
   adder:
-    bootstrap-server: :9092
-    group-id: my-group
-    client-id: DEV
     topic: out
-    auto-offset-reset: earliest
-    commit-interval: 5s
 management:
   endpoint:
     shutdown:
@@ -21,16 +16,25 @@ management:
       enabled: true
 info:
   kafka:
-    bootstrap-server: ${consumer.bootstrap-server}
-    client-id: ${consumer.client-id}
-    group-id: ${consumer.group-id}
+    bootstrap-server: ${spring.kafka.consumer.bootstrap-servers}
+    client-id: ${spring.kafka.consumer.client-id}
+    group-id: ${spring.kafka.consumer.group-id}
     topic: ${consumer.topic}
-    auto-offset-reset: ${consumer.auto-offset-reset}
+    auto-offset-reset: ${spring.kafka.consumer.auto-offset-reset}
 spring:
   data:
     mongodb:
       uri: mongodb://juplo:training@localhost:27017
       database: juplo
+  kafka:
+    bootstrap-servers: :9092
+    client-id: DEV
+    consumer:
+      group-id: my-group
+      auto-offset-reset: earliest
+      auto-commit-interval: 5s
+      properties:
+        metadata.max.age.ms: 1000
 logging:
   level:
     root: INFO
index dcac79b..4bb4f5b 100644 (file)
@@ -14,7 +14,7 @@ import static de.juplo.kafka.ApplicationIT.TOPIC;
 @SpringBootTest(
     webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
     properties = {
-        "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}",
+        "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
         "sumup.adder.topic=" + TOPIC,
         "spring.mongodb.embedded.version=4.4.13" })
 @EmbeddedKafka(topics = TOPIC)
index a8fa7ea..0e05dbe 100644 (file)
@@ -13,6 +13,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.junit.jupiter.api.*;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
 import org.springframework.boot.autoconfigure.mongo.MongoProperties;
 import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
@@ -39,9 +40,9 @@ import static org.awaitility.Awaitility.*;
 @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
 @TestPropertySource(
                properties = {
-                               "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}",
+                               "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
                                "sumup.adder.topic=" + TOPIC,
-                               "sumup.adder.commit-interval=500ms",
+                               "spring.kafka.consumer.auto-commit-interval=500ms",
                                "spring.mongodb.embedded.version=4.4.13" })
 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
 @EnableAutoConfiguration
@@ -58,7 +59,9 @@ abstract class GenericApplicationTests<K, V>
        @Autowired
        Consumer<ConsumerRecord<K, V>> consumer;
        @Autowired
-       ApplicationProperties properties;
+       ApplicationProperties applicationProperties;
+  @Autowired
+  KafkaProperties kafkaProperties;
        @Autowired
        ExecutorService executor;
        @Autowired
@@ -337,16 +340,16 @@ abstract class GenericApplicationTests<K, V>
        {
                Properties props;
                props = new Properties();
-               props.put("bootstrap.servers", properties.getBootstrapServer());
+               props.put("bootstrap.servers", kafkaProperties.getBootstrapServers());
                props.put("linger.ms", 100);
                props.put("key.serializer", BytesSerializer.class.getName());
                props.put("value.serializer", BytesSerializer.class.getName());
                testRecordProducer = new KafkaProducer<>(props);
 
                props = new Properties();
-               props.put("bootstrap.servers", properties.getBootstrapServer());
+               props.put("bootstrap.servers", kafkaProperties.getBootstrapServers());
                props.put("client.id", "OFFSET-CONSUMER");
-               props.put("group.id", properties.getGroupId());
+               props.put("group.id", kafkaProperties.getConsumer().getGroupId());
                props.put("key.deserializer", BytesDeserializer.class.getName());
                props.put("value.deserializer", BytesDeserializer.class.getName());
                offsetConsumer = new KafkaConsumer<>(props);
@@ -380,8 +383,8 @@ abstract class GenericApplicationTests<K, V>
                endlessConsumer =
                                new EndlessConsumer<>(
                                                executor,
-                                               properties.getClientId(),
-                                               properties.getTopic(),
+                                               kafkaProperties.getClientId(),
+                                               applicationProperties.getTopic(),
                                                kafkaConsumer,
                                                rebalanceListener,
                                                captureOffsetAndExecuteTestHandler);