From d8f4e839b57b8405ca909b5472a8d4639937a101 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 22 Apr 2022 11:08:37 +0200 Subject: [PATCH] =?utf8?q?Springify:=20Konfiguration=20erfolgt=20=C3=BCber?= =?utf8?q?=20`KafkaProperties`?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- pom.xml | 4 ++++ .../juplo/kafka/ApplicationConfiguration.java | 20 ++++++++++-------- .../de/juplo/kafka/ApplicationProperties.java | 12 ----------- src/main/resources/application.yml | 20 +++++++++++------- .../java/de/juplo/kafka/ApplicationTests.java | 21 +++++++++++-------- 5 files changed, 39 insertions(+), 38 deletions(-) diff --git a/pom.xml b/pom.xml index f218085..21466ec 100644 --- a/pom.xml +++ b/pom.xml @@ -38,6 +38,10 @@ org.apache.kafka kafka-clients + + org.springframework.kafka + spring-kafka + org.projectlombok lombok diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 4054e93..3c526df 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -4,6 +4,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -15,7 +16,7 @@ import java.util.function.Consumer; @Configuration -@EnableConfigurationProperties(ApplicationProperties.class) +@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class }) public class ApplicationConfiguration { @Bean @@ -32,13 +33,14 @@ public class ApplicationConfiguration KafkaConsumer kafkaConsumer, ExecutorService executor, Consumer> handler, - ApplicationProperties properties) + KafkaProperties kafkaProperties, + ApplicationProperties applicationProperties) { return new EndlessConsumer<>( executor, - properties.getClientId(), - properties.getTopic(), + kafkaProperties.getConsumer().getClientId(), + applicationProperties.getTopic(), kafkaConsumer, handler); } @@ -50,14 +52,14 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "close") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) + public KafkaConsumer kafkaConsumer(KafkaProperties properties) { Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("group.id", properties.getGroupId()); - props.put("client.id", properties.getClientId()); - props.put("auto.offset.reset", properties.getAutoOffsetReset()); + props.put("bootstrap.servers", properties.getConsumer().getBootstrapServers()); + props.put("group.id", properties.getConsumer().getGroupId()); + props.put("client.id", properties.getConsumer().getClientId()); + props.put("auto.offset.reset", properties.getConsumer().getAutoOffsetReset()); props.put("metadata.max.age.ms", "1000"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", LongDeserializer.class.getName()); diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index fa731c5..c7c4f78 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -15,19 +15,7 @@ import javax.validation.constraints.NotNull; @Setter public class ApplicationProperties { - @NotNull - @NotEmpty - private String bootstrapServer; - @NotNull - @NotEmpty - private String groupId; - @NotNull - @NotEmpty - private String clientId; @NotNull @NotEmpty private String topic; - @NotNull - @NotEmpty - private String autoOffsetReset; } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 9f3cb81..07fb162 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,9 +1,5 @@ consumer: - bootstrap-server: :9092 - group-id: my-group - client-id: DEV topic: test - auto-offset-reset: earliest management: endpoint: shutdown: @@ -19,11 +15,19 @@ management: enabled: true info: kafka: - bootstrap-server: ${consumer.bootstrap-server} - client-id: ${consumer.client-id} - group-id: ${consumer.group-id} + bootstrap-server: ${spring.kafka.consumer.bootstrap-servers} + client-id: ${spring.kafka.consumer.client-id} + group-id: ${spring.kafka.consumer.group-id} topic: ${consumer.topic} - auto-offset-reset: ${consumer.auto-offset-reset} + auto-offset-reset: ${spring.kafka.consumer.auto-offset-reset} +spring: + kafka: + consumer: + bootstrap-servers: :9092 + client-id: DEV + auto-offset-reset: earliest + group-id: my-group + value-deserializer: org.apache.kafka.common.serialization.LongDeserializer logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 40dc149..d446bbe 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -11,6 +11,7 @@ import org.apache.kafka.common.serialization.*; import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; @@ -39,7 +40,7 @@ import static org.awaitility.Awaitility.*; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @TestPropertySource( properties = { - "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", + "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}", "consumer.topic=" + TOPIC }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @Slf4j @@ -60,7 +61,9 @@ class ApplicationTests @Autowired KafkaConsumer offsetConsumer; @Autowired - ApplicationProperties properties; + ApplicationProperties applicationProperties; + @Autowired + KafkaProperties kafkaProperties; @Autowired ExecutorService executor; @@ -255,8 +258,8 @@ class ApplicationTests endlessConsumer = new EndlessConsumer<>( executor, - properties.getClientId(), - properties.getTopic(), + kafkaProperties.getConsumer().getClientId(), + applicationProperties.getTopic(), kafkaConsumer, captureOffsetAndExecuteTestHandler); @@ -288,10 +291,10 @@ class ApplicationTests } @Bean - KafkaProducer kafkaProducer(ApplicationProperties properties) + KafkaProducer kafkaProducer(KafkaProperties properties) { Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("bootstrap.servers", properties.getConsumer().getBootstrapServers()); props.put("linger.ms", 100); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", BytesSerializer.class.getName()); @@ -300,12 +303,12 @@ class ApplicationTests } @Bean - KafkaConsumer offsetConsumer(ApplicationProperties properties) + KafkaConsumer offsetConsumer(KafkaProperties properties) { Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("bootstrap.servers", properties.getConsumer().getBootstrapServers()); props.put("client.id", "OFFSET-CONSUMER"); - props.put("group.id", properties.getGroupId()); + props.put("group.id", properties.getConsumer().getGroupId()); props.put("key.deserializer", BytesDeserializer.class.getName()); props.put("value.deserializer", BytesDeserializer.class.getName()); -- 2.20.1