From c1fb1a3453b811f664d5551c2e0d21f2a75ff076 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 29 Sep 2024 21:41:54 +0200 Subject: [PATCH] `ExampleConsumer` als Spring-Boot App (ohne Spring Kafka) --- pom.xml | 9 ++++- .../juplo/kafka/ApplicationConfiguration.java | 39 +++++++++++++------ .../de/juplo/kafka/ApplicationProperties.java | 15 +++++++ src/main/resources/application.yml | 27 +++++-------- 4 files changed, 60 insertions(+), 30 deletions(-) diff --git a/pom.xml b/pom.xml index 63651ca..98a0a36 100644 --- a/pom.xml +++ b/pom.xml @@ -40,8 +40,8 @@ spring-boot-starter-validation - org.springframework.kafka - spring-kafka + org.apache.kafka + kafka-clients org.projectlombok @@ -52,6 +52,11 @@ spring-boot-starter-test test + + org.springframework.kafka + spring-kafka + test + org.springframework.kafka spring-kafka-test diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 21e13a7..b0d1668 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,34 +1,51 @@ package de.juplo.kafka; -import org.apache.kafka.clients.consumer.Consumer; -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.StickyAssignor; +import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.core.ConsumerFactory; +import java.util.Properties; @Configuration -@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class }) +@EnableConfigurationProperties(ApplicationProperties.class) public class ApplicationConfiguration { @Bean public ExampleConsumer exampleConsumer( - Consumer kafkaConsumer, - KafkaProperties kafkaProperties, - ApplicationProperties applicationProperties) + KafkaConsumer kafkaConsumer, + ApplicationProperties properties) { return new ExampleConsumer( kafkaConsumer, - kafkaProperties.getClientId(), - applicationProperties.getTopics()); + properties.getClientId(), + properties.getTopics()); } @Bean - public Consumer kafkaConsumer(ConsumerFactory factory) + public KafkaConsumer kafkaConsumer(ApplicationProperties properties) { - return factory.createConsumer(); + Properties props = new Properties(); + props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("client.id", properties.getClientId()); + props.put("group.id", properties.getGroupId()); + if (properties.getAutoOffsetReset() != null) + { + props.put("auto.offset.reset", properties.getAutoOffsetReset().name()); + } + if (properties.autoCommitInterval != null) + { + props.put("auto.commit.interval", properties.getAutoCommitInterval()); + } + props.put("metadata.maxage.ms", 5000); // 5 Sekunden + props.put("partition.assignment.strategy", StickyAssignor.class.getName()); + props.put("key.deserializer", StringDeserializer.class.getName()); + props.put("value.deserializer", StringDeserializer.class.getName()); + + return new KafkaConsumer(props); } } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 463ea89..8183bc1 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -7,6 +7,8 @@ import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.validation.annotation.Validated; +import java.time.Duration; + @ConfigurationProperties(prefix = "juplo.consumer") @Validated @@ -14,7 +16,20 @@ import org.springframework.validation.annotation.Validated; @Setter public class ApplicationProperties { + @NotNull + @NotEmpty + private String bootstrapServer; + @NotNull + @NotEmpty + private String clientId; + @NotNull + @NotEmpty + private String groupId; @NotNull @NotEmpty private String[] topics; + ApplicationProperties.OffsetReset autoOffsetReset; + Duration autoCommitInterval; + + enum OffsetReset { latest, earliest, none} } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 81fdca8..201aefa 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,6 +1,11 @@ juplo: consumer: + bootstrap-server: :9092 + client-id: DEV + group-id: my-group topics: test + auto-offset-reset: earliest + auto-commit-interval: 5s management: endpoint: shutdown: @@ -16,24 +21,12 @@ management: enabled: true info: kafka: - bootstrap-server: ${spring.kafka.bootstrap-servers} - client-id: ${spring.kafka.client-id} - group-id: ${spring.kafka.consumer.group-id} + bootstrap-server: ${juplo.consumer.bootstrap-server} + client-id: ${juplo.consumer.client-id} + group-id: ${juplo.consumer.group-id} topics: ${juplo.consumer.topics} - auto-offset-reset: ${spring.kafka.consumer.auto-offset-reset} -spring: - kafka: - bootstrap-servers: :9092 - client-id: DEV - consumer: - group-id: my-group - auto-offset-reset: earliest - auto-commit-interval: 5s - key-deserializer: org.apache.kafka.common.serialization.StringDeserializer - value-deserializer: org.apache.kafka.common.serialization.StringDeserializer - properties: - partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor - metadata.max.age.ms: 1000 + auto-offset-reset: ${juplo.consumer.auto-offset-reset} + auto-commit-interval: ${juplo.consumer.auto-commit-interval} logging: level: root: INFO -- 2.20.1