Springify: Konfiguration erfolgt über `KafkaProperties`
authorKai Moritz <kai@juplo.de>
Fri, 22 Apr 2022 09:08:37 +0000 (11:08 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 22 Apr 2022 09:08:37 +0000 (11:08 +0200)
pom.xml
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ApplicationTests.java

diff --git a/pom.xml b/pom.xml
index f218085..21466ec 100644 (file)
--- a/pom.xml
+++ b/pom.xml
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.springframework.kafka</groupId>
+      <artifactId>spring-kafka</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.projectlombok</groupId>
       <artifactId>lombok</artifactId>
index 4054e93..3c526df 100644 (file)
@@ -4,6 +4,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -15,7 +16,7 @@ import java.util.function.Consumer;
 
 
 @Configuration
-@EnableConfigurationProperties(ApplicationProperties.class)
+@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
 public class ApplicationConfiguration
 {
   @Bean
@@ -32,13 +33,14 @@ public class ApplicationConfiguration
       KafkaConsumer<String, Long> kafkaConsumer,
       ExecutorService executor,
       Consumer<ConsumerRecord<String, Long>> handler,
-      ApplicationProperties properties)
+      KafkaProperties kafkaProperties,
+      ApplicationProperties applicationProperties)
   {
     return
         new EndlessConsumer<>(
             executor,
-            properties.getClientId(),
-            properties.getTopic(),
+            kafkaProperties.getConsumer().getClientId(),
+            applicationProperties.getTopic(),
             kafkaConsumer,
             handler);
   }
@@ -50,14 +52,14 @@ public class ApplicationConfiguration
   }
 
   @Bean(destroyMethod = "close")
-  public KafkaConsumer<String, Long> kafkaConsumer(ApplicationProperties properties)
+  public KafkaConsumer<String, Long> kafkaConsumer(KafkaProperties properties)
   {
     Properties props = new Properties();
 
-    props.put("bootstrap.servers", properties.getBootstrapServer());
-    props.put("group.id", properties.getGroupId());
-    props.put("client.id", properties.getClientId());
-    props.put("auto.offset.reset", properties.getAutoOffsetReset());
+    props.put("bootstrap.servers", properties.getConsumer().getBootstrapServers());
+    props.put("group.id", properties.getConsumer().getGroupId());
+    props.put("client.id", properties.getConsumer().getClientId());
+    props.put("auto.offset.reset", properties.getConsumer().getAutoOffsetReset());
     props.put("metadata.max.age.ms", "1000");
     props.put("key.deserializer", StringDeserializer.class.getName());
     props.put("value.deserializer", LongDeserializer.class.getName());
index fa731c5..c7c4f78 100644 (file)
@@ -15,19 +15,7 @@ import javax.validation.constraints.NotNull;
 @Setter
 public class ApplicationProperties
 {
-  @NotNull
-  @NotEmpty
-  private String bootstrapServer;
-  @NotNull
-  @NotEmpty
-  private String groupId;
-  @NotNull
-  @NotEmpty
-  private String clientId;
   @NotNull
   @NotEmpty
   private String topic;
-  @NotNull
-  @NotEmpty
-  private String autoOffsetReset;
 }
index 9f3cb81..07fb162 100644 (file)
@@ -1,9 +1,5 @@
 consumer:
-  bootstrap-server: :9092
-  group-id: my-group
-  client-id: DEV
   topic: test
-  auto-offset-reset: earliest
 management:
   endpoint:
     shutdown:
@@ -19,11 +15,19 @@ management:
       enabled: true
 info:
   kafka:
-    bootstrap-server: ${consumer.bootstrap-server}
-    client-id: ${consumer.client-id}
-    group-id: ${consumer.group-id}
+    bootstrap-server: ${spring.kafka.consumer.bootstrap-servers}
+    client-id: ${spring.kafka.consumer.client-id}
+    group-id: ${spring.kafka.consumer.group-id}
     topic: ${consumer.topic}
-    auto-offset-reset: ${consumer.auto-offset-reset}
+    auto-offset-reset: ${spring.kafka.consumer.auto-offset-reset}
+spring:
+  kafka:
+    consumer:
+      bootstrap-servers: :9092
+      client-id: DEV
+      auto-offset-reset: earliest
+      group-id: my-group
+      value-deserializer: org.apache.kafka.common.serialization.LongDeserializer
 logging:
   level:
     root: INFO
index 40dc149..d446bbe 100644 (file)
@@ -11,6 +11,7 @@ import org.apache.kafka.common.serialization.*;
 import org.apache.kafka.common.utils.Bytes;
 import org.junit.jupiter.api.*;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
 import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
@@ -39,7 +40,7 @@ import static org.awaitility.Awaitility.*;
 @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
 @TestPropertySource(
                properties = {
-                               "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
+                               "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}",
                                "consumer.topic=" + TOPIC })
 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
 @Slf4j
@@ -60,7 +61,9 @@ class ApplicationTests
        @Autowired
        KafkaConsumer<Bytes, Bytes> offsetConsumer;
        @Autowired
-       ApplicationProperties properties;
+       ApplicationProperties applicationProperties;
+       @Autowired
+       KafkaProperties kafkaProperties;
        @Autowired
        ExecutorService executor;
 
@@ -255,8 +258,8 @@ class ApplicationTests
                endlessConsumer =
                                new EndlessConsumer<>(
                                                executor,
-                                               properties.getClientId(),
-                                               properties.getTopic(),
+                                               kafkaProperties.getConsumer().getClientId(),
+                                               applicationProperties.getTopic(),
                                                kafkaConsumer,
                                                captureOffsetAndExecuteTestHandler);
 
@@ -288,10 +291,10 @@ class ApplicationTests
                }
 
                @Bean
-               KafkaProducer<String, Bytes> kafkaProducer(ApplicationProperties properties)
+               KafkaProducer<String, Bytes> kafkaProducer(KafkaProperties properties)
                {
                        Properties props = new Properties();
-                       props.put("bootstrap.servers", properties.getBootstrapServer());
+                       props.put("bootstrap.servers", properties.getConsumer().getBootstrapServers());
                        props.put("linger.ms", 100);
                        props.put("key.serializer", StringSerializer.class.getName());
                        props.put("value.serializer", BytesSerializer.class.getName());
@@ -300,12 +303,12 @@ class ApplicationTests
                }
 
                @Bean
-               KafkaConsumer<Bytes, Bytes> offsetConsumer(ApplicationProperties properties)
+               KafkaConsumer<Bytes, Bytes> offsetConsumer(KafkaProperties properties)
                {
                        Properties props = new Properties();
-                       props.put("bootstrap.servers", properties.getBootstrapServer());
+                       props.put("bootstrap.servers", properties.getConsumer().getBootstrapServers());
                        props.put("client.id", "OFFSET-CONSUMER");
-                       props.put("group.id", properties.getGroupId());
+                       props.put("group.id", properties.getConsumer().getGroupId());
                        props.put("key.deserializer", BytesDeserializer.class.getName());
                        props.put("value.deserializer", BytesDeserializer.class.getName());