From 03861921288e4a908e03e1830a2cde70d7a73db3 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 13 Apr 2025 12:17:15 +0200 Subject: [PATCH] Auf die Autoconfiguration von Spring Boot umgestellt --- docker/docker-compose.yml | 18 ++++++++---- pom.xml | 9 ++---- .../juplo/kafka/ApplicationConfiguration.java | 29 ++++--------------- .../de/juplo/kafka/ApplicationProperties.java | 16 ---------- src/main/resources/application.yml | 21 +++++++------- .../java/de/juplo/kafka/ApplicationTests.java | 5 +--- 6 files changed, 31 insertions(+), 67 deletions(-) diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 5d1fa466..e5b9a9fe 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -142,22 +142,28 @@ services: consumer: image: juplo/spring-consumer:1.1-generics-SNAPSHOT environment: - juplo.bootstrap-server: kafka:9092 - juplo.client-id: consumer + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.client-id: consumer + spring.kafka.consumer.auto-offset-reset: earliest + logging.level.org.apache.kafka.clients.consumer: INFO juplo.consumer.topic: test peter: image: juplo/spring-consumer:1.1-generics-SNAPSHOT environment: - juplo.bootstrap-server: kafka:9092 - juplo.client-id: peter + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.client-id: consumer + spring.kafka.consumer.auto-offset-reset: earliest + logging.level.org.apache.kafka.clients.consumer: INFO juplo.consumer.topic: test ute: image: juplo/spring-consumer:1.1-generics-SNAPSHOT environment: - juplo.bootstrap-server: kafka:9092 - juplo.client-id: ute + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.client-id: consumer + spring.kafka.consumer.auto-offset-reset: earliest + logging.level.org.apache.kafka.clients.consumer: INFO juplo.consumer.topic: test volumes: diff --git a/pom.xml b/pom.xml index 88730f1d..250f40d9 100644 --- a/pom.xml +++ b/pom.xml @@ -40,8 +40,8 @@ spring-boot-starter-validation - org.apache.kafka - kafka-clients + org.springframework.kafka + spring-kafka org.projectlombok @@ -53,11 +53,6 @@ spring-boot-starter-test test - - org.springframework.kafka - spring-kafka - test - org.springframework.kafka spring-kafka-test diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index c525b111..e523a203 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,14 +1,12 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; - -import java.util.Properties; +import org.springframework.kafka.core.ConsumerFactory; @Configuration @@ -19,35 +17,20 @@ public class ApplicationConfiguration public ExampleConsumer exampleConsumer( Consumer kafkaConsumer, ApplicationProperties properties, + KafkaProperties kafkaProperties, ConfigurableApplicationContext applicationContext) { return new ExampleConsumer<>( - properties.getClientId(), + kafkaProperties.getClientId(), properties.getConsumerProperties().getTopic(), kafkaConsumer, () -> applicationContext.close()); } @Bean(destroyMethod = "") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) + public Consumer kafkaConsumer(ConsumerFactory consumerFactory) { - Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("client.id", properties.getClientId()); - props.put("group.id", properties.getConsumerProperties().getGroupId()); - if (properties.getConsumerProperties().getAutoOffsetReset() != null) - { - props.put("auto.offset.reset", properties.getConsumerProperties().getAutoOffsetReset().name()); - } - if (properties.getConsumerProperties().getAutoCommitInterval() != null) - { - props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval()); - } - props.put("metadata.max.age.ms", 5000); // 5 Sekunden - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - - return new KafkaConsumer<>(props); + return consumerFactory.createConsumer(); } } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index c8193c9f..22c755e8 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -7,8 +7,6 @@ import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.validation.annotation.Validated; -import java.time.Duration; - @ConfigurationProperties(prefix = "juplo") @Validated @@ -16,13 +14,6 @@ import java.time.Duration; @Setter public class ApplicationProperties { - @NotNull - @NotEmpty - private String bootstrapServer; - @NotNull - @NotEmpty - private String clientId; - @NotNull private ConsumerProperties consumer; @@ -38,15 +29,8 @@ public class ApplicationProperties @Setter static class ConsumerProperties { - @NotNull - @NotEmpty - private String groupId; @NotNull @NotEmpty private String topic; - private OffsetReset autoOffsetReset; - private Duration autoCommitInterval; - - enum OffsetReset { latest, earliest, none } } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 7a06731c..71dddda3 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,11 +1,6 @@ juplo: - bootstrap-server: :9092 - client-id: DEV consumer: - group-id: my-group topic: test - auto-offset-reset: earliest - auto-commit-interval: 5s management: endpoint: shutdown: @@ -21,13 +16,17 @@ management: enabled: true info: kafka: - bootstrap-server: ${juplo.bootstrap-server} - client-id: ${juplo.client-id} + bootstrap-server: ${spring.kafka.bootstrap-servers} + client-id: ${spring.kafka.client-id} + group-id: ${spring.kafka.consumer.group-id} + topic: ${simple.consumer.topic} + auto-offset-reset: ${spring.kafka.consumer.auto-offset-reset} +spring: + kafka: + bootstrap-servers: :9092 + client-id: DEV consumer: - group-id: ${juplo.consumer.group-id} - topic: ${juplo.consumer.topic} - auto-offset-reset: ${juplo.consumer.auto-offset-reset} - auto-commit-interval: ${juplo.consumer.auto-commit-interval} + group-id: my-group logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index ae119bff..fae9ae34 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -17,10 +17,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; -@SpringBootTest( - properties = { - "juplo.bootstrap-server=${spring.embedded.kafka.brokers}", - "juplo.consumer.topic=" + TOPIC }) +@SpringBootTest(properties = { "juplo.consumer.topic=" + TOPIC }) @AutoConfigureMockMvc @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) public class ApplicationTests -- 2.20.1