From: Kai Moritz Date: Tue, 18 Mar 2025 16:13:57 +0000 (+0100) Subject: Umbau auf Autoconfig von Spring Kafka X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=refs%2Fheads%2Fspring%2Fspring-consumer--autoconfig;p=demos%2Fkafka%2Ftraining Umbau auf Autoconfig von Spring Kafka --- diff --git a/pom.xml b/pom.xml index dd96d00f..a5197c8f 100644 --- a/pom.xml +++ b/pom.xml @@ -40,8 +40,8 @@ spring-boot-starter-validation - org.apache.kafka - kafka-clients + org.springframework.kafka + spring-kafka org.projectlombok @@ -53,11 +53,6 @@ spring-boot-starter-test test - - org.springframework.kafka - spring-kafka - test - org.springframework.kafka spring-kafka-test diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index b98c401d..adf6a085 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,55 +1,34 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.StickyAssignor; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; - -import java.util.Properties; +import org.springframework.kafka.core.ConsumerFactory; @Configuration -@EnableConfigurationProperties(ApplicationProperties.class) public class ApplicationConfiguration { @Bean public ExampleConsumer exampleConsumer( Consumer kafkaConsumer, - ApplicationProperties properties, + @Value("${spring.kafka.client-id}") String clientId, + @Value("${juplo.consumer.topic}") String topic, ConfigurableApplicationContext applicationContext) { return new ExampleConsumer( - properties.getClientId(), - properties.getConsumerProperties().getTopic(), + clientId, + topic, kafkaConsumer, () -> applicationContext.close()); } @Bean(destroyMethod = "") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) + public Consumer kafkaConsumer(ConsumerFactory consumerFactory) { - Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("client.id", properties.getClientId()); - props.put("group.id", properties.getConsumerProperties().getGroupId()); - if (properties.getConsumerProperties().getAutoOffsetReset() != null) - { - props.put("auto.offset.reset", properties.getConsumerProperties().getAutoOffsetReset().name()); - } - if (properties.getConsumerProperties().getAutoCommitInterval() != null) - { - props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval()); - } - props.put("metadata.maxage.ms", 5000); // 5 Sekunden - props.put("partition.assignment.strategy", StickyAssignor.class.getName()); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - - return new KafkaConsumer<>(props); + return consumerFactory.createConsumer(); } } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java deleted file mode 100644 index c8193c9f..00000000 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ /dev/null @@ -1,52 +0,0 @@ -package de.juplo.kafka; - -import jakarta.validation.constraints.NotEmpty; -import jakarta.validation.constraints.NotNull; -import lombok.Getter; -import lombok.Setter; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.validation.annotation.Validated; - -import java.time.Duration; - - -@ConfigurationProperties(prefix = "juplo") -@Validated -@Getter -@Setter -public class ApplicationProperties -{ - @NotNull - @NotEmpty - private String bootstrapServer; - @NotNull - @NotEmpty - private String clientId; - - @NotNull - private ConsumerProperties consumer; - - - public ConsumerProperties getConsumerProperties() - { - return consumer; - } - - - @Validated - @Getter - @Setter - static class ConsumerProperties - { - @NotNull - @NotEmpty - private String groupId; - @NotNull - @NotEmpty - private String topic; - private OffsetReset autoOffsetReset; - private Duration autoCommitInterval; - - enum OffsetReset { latest, earliest, none } - } -} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 7a06731c..f38bc561 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,11 +1,6 @@ juplo: - bootstrap-server: :9092 - client-id: DEV consumer: - group-id: my-group topic: test - auto-offset-reset: earliest - auto-commit-interval: 5s management: endpoint: shutdown: @@ -28,6 +23,12 @@ info: topic: ${juplo.consumer.topic} auto-offset-reset: ${juplo.consumer.auto-offset-reset} auto-commit-interval: ${juplo.consumer.auto-commit-interval} +spring: + kafka: + bootstrap-servers: :9092 + client-id: DEV + consumer: + group-id: my-group logging: level: root: INFO