From 4cb45d59008b5388d593f2c87e152acc23b7fb86 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 7 Nov 2022 21:40:24 +0100 Subject: [PATCH] Version des Simple-Consumer, mit Auto-Konfiguration durch Spring Kafka --- pom.xml | 10 ++--- .../juplo/kafka/ApplicationConfiguration.java | 23 ++++------- .../de/juplo/kafka/ApplicationProperties.java | 12 ------ src/main/resources/application.yml | 25 ++++++++---- .../java/de/juplo/kafka/ApplicationIT.java | 40 +++++++++++++++++++ 5 files changed, 67 insertions(+), 43 deletions(-) create mode 100644 src/test/java/de/juplo/kafka/ApplicationIT.java diff --git a/pom.xml b/pom.xml index c73251c..cdfb199 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-consumer 1.0-SNAPSHOT Spring Consumer - Super Simple Consumer-Group, that is implemented as a Spring-Boot application + Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka 11 @@ -40,8 +40,8 @@ true - org.apache.kafka - kafka-clients + org.springframework.kafka + spring-kafka org.projectlombok @@ -72,10 +72,6 @@ - - pl.project13.maven - git-commit-id-plugin - io.fabric8 docker-maven-plugin diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 46bb667..62d61a2 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,43 +1,34 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import java.util.Properties; +import org.springframework.kafka.core.ConsumerFactory; @Configuration -@EnableConfigurationProperties({ ApplicationProperties.class }) +@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class }) public class ApplicationConfiguration { @Bean public SimpleConsumer simpleConsumer( Consumer kafkaConsumer, + KafkaProperties kafkaProperties, ApplicationProperties applicationProperties) { return new SimpleConsumer( - applicationProperties.getClientId(), + kafkaProperties.getClientId(), applicationProperties.getTopic(), kafkaConsumer); } @Bean - public Consumer kafkaConsumer(ApplicationProperties properties) + public Consumer kafkaConsumer(ConsumerFactory factory) { - Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServers()); - props.put("group.id", properties.getGroupId()); // ID für die Offset-Commits - props.put("client.id", properties.getClientId()); // Nur zur Wiedererkennung - props.put("auto.offset.reset", properties.getAutoOffsetReset()); - props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - - return new KafkaConsumer<>(props); + return factory.createConsumer(); } } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 5675db7..a4cc8b8 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -15,19 +15,7 @@ import javax.validation.constraints.NotNull; @Setter public class ApplicationProperties { - @NotNull - @NotEmpty - private String bootstrapServers; - @NotNull - @NotEmpty - private String groupId; - @NotNull - @NotEmpty - private String clientId; @NotNull @NotEmpty private String topic; - @NotNull - @NotEmpty - private String autoOffsetReset; } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 7089338..d524e5f 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,10 +1,6 @@ simple: consumer: - bootstrap-servers: ":9092" - group-id: "my-group" - client-id: "DEV" topic: test - auto-offset-reset: earliest management: endpoint: shutdown: @@ -20,11 +16,24 @@ management: enabled: true info: kafka: - bootstrap-server: ${simple.consumer.bootstrap-servers} - client-id: ${simple.consumer.client-id} - group-id: ${simple.consumer.group-id} + bootstrap-server: ${spring.kafka.bootstrap-servers} + client-id: ${spring.kafka.client-id} + group-id: ${spring.kafka.consumer.group-id} topic: ${simple.consumer.topic} - auto-offset-reset: ${simple.consumer.auto-offset-reset} + auto-offset-reset: ${spring.kafka.consumer.auto-offset-reset} +spring: + kafka: + bootstrap-servers: :9092 + client-id: DEV + consumer: + group-id: my-group + auto-offset-reset: earliest + auto-commit-interval: 5s + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + properties: + partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor + metadata.max.age.ms: 1000 logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/ApplicationIT.java b/src/test/java/de/juplo/kafka/ApplicationIT.java new file mode 100644 index 0000000..1baca99 --- /dev/null +++ b/src/test/java/de/juplo/kafka/ApplicationIT.java @@ -0,0 +1,40 @@ +package de.juplo.kafka; + +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.boot.test.web.server.LocalServerPort; +import org.springframework.kafka.test.context.EmbeddedKafka; + +import static de.juplo.kafka.ApplicationIT.TOPIC; + + +@SpringBootTest( + webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, + properties = { + "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "simple.consumer.topic=" + TOPIC }) +@EmbeddedKafka(topics = TOPIC) +public class ApplicationIT +{ + public static final String TOPIC = "FOO"; + + @LocalServerPort + private int port; + + @Autowired + private TestRestTemplate restTemplate; + + + + @Test + public void testApplicationStartup() + { + restTemplate.getForObject( + "http://localhost:" + port + "/actuator/health", + String.class + ) + .contains("UP"); + } +} -- 2.20.1