From: Kai Moritz Date: Thu, 14 Nov 2024 20:33:44 +0000 (+0100) Subject: Umbau des `spring-consumer` auf den `@KafkaListener` X-Git-Tag: spring/spring-consumer--kafkalistener--REBASE-ANFANG X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=481e188e1b42572e81e7f4907d26239e3fc949d9;p=demos%2Fkafka%2Ftraining Umbau des `spring-consumer` auf den `@KafkaListener` --- diff --git a/README.sh b/README.sh index b46e235..6b1d575 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-kafkalistener-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 6bd2766..5e5c055 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -199,10 +199,12 @@ services: juplo.producer.throttle-ms: 100 consumer: - image: juplo/spring-consumer:1.1-SNAPSHOT + image: juplo/spring-consumer:1.1-kafkalistener-SNAPSHOT environment: - juplo.bootstrap-server: kafka:9092 - juplo.client-id: consumer + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.client-id: consumer + spring.kafka.consumer.auto-offset-reset: earliest + logging.level.org.apache.kafka.clients.consumer: INFO juplo.consumer.topic: test volumes: diff --git a/pom.xml b/pom.xml index 98a0a36..a5969cf 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-consumer Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-SNAPSHOT + 1.1-kafkalistener-SNAPSHOT 21 @@ -40,8 +40,8 @@ spring-boot-starter-validation - org.apache.kafka - kafka-clients + org.springframework.kafka + spring-kafka org.projectlombok @@ -52,11 +52,6 @@ spring-boot-starter-test test - - org.springframework.kafka - spring-kafka - test - org.springframework.kafka spring-kafka-test diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java deleted file mode 100644 index a4856a6..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ /dev/null @@ -1,55 +0,0 @@ -package de.juplo.kafka; - -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.StickyAssignor; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -import java.util.Properties; - - -@Configuration -@EnableConfigurationProperties(ApplicationProperties.class) -public class ApplicationConfiguration -{ - @Bean - public ExampleConsumer exampleConsumer( - Consumer kafkaConsumer, - ApplicationProperties properties, - ConfigurableApplicationContext applicationContext) - { - return - new ExampleConsumer( - properties.getClientId(), - properties.getConsumerProperties().getTopic(), - kafkaConsumer, - () -> applicationContext.close()); - } - - @Bean(destroyMethod = "") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) - { - Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("client.id", properties.getClientId()); - props.put("group.id", properties.getConsumerProperties().getGroupId()); - if (properties.getConsumerProperties().getAutoOffsetReset() != null) - { - props.put("auto.offset.reset", properties.getConsumerProperties().getAutoOffsetReset().name()); - } - if (properties.getConsumerProperties().getAutoCommitInterval() != null) - { - props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval()); - } - props.put("metadata.maxage.ms", 5000); // 5 Sekunden - props.put("partition.assignment.strategy", StickyAssignor.class.getName()); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - - return new KafkaConsumer<>(props); - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index c8193c9..22c755e 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -7,8 +7,6 @@ import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.validation.annotation.Validated; -import java.time.Duration; - @ConfigurationProperties(prefix = "juplo") @Validated @@ -16,13 +14,6 @@ import java.time.Duration; @Setter public class ApplicationProperties { - @NotNull - @NotEmpty - private String bootstrapServer; - @NotNull - @NotEmpty - private String clientId; - @NotNull private ConsumerProperties consumer; @@ -38,15 +29,8 @@ public class ApplicationProperties @Setter static class ConsumerProperties { - @NotNull - @NotEmpty - private String groupId; @NotNull @NotEmpty private String topic; - private OffsetReset autoOffsetReset; - private Duration autoCommitInterval; - - enum OffsetReset { latest, earliest, none } } } diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index f832b45..5786c4b 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -1,107 +1,36 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.errors.WakeupException; - -import java.time.Duration; -import java.util.Arrays; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Component; @Slf4j -public class ExampleConsumer implements Runnable +@Component +public class ExampleConsumer { - private final String id; - private final String topic; - private final Consumer consumer; - private final Thread workerThread; - private final Runnable closeCallback; - - private volatile boolean running = false; + @Value("${spring.kafka.client-id}") + private String id; private long consumed = 0; - - public ExampleConsumer( - String clientId, - String topic, - Consumer consumer, - Runnable closeCallback) - { - this.id = clientId; - this.topic = topic; - this.consumer = consumer; - - workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); - workerThread.start(); - - this.closeCallback = closeCallback; - } - - - @Override - public void run() - { - try - { - log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); - running = true; - - while (running) - { - ConsumerRecords records = - consumer.poll(Duration.ofSeconds(1)); - - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); - } - } - } - catch(WakeupException e) - { - log.info("{} - Consumer was signaled to finish its work", id); - } - catch(Exception e) - { - log.error("{} - Unexpected error, unsubscribing!", id, e); - consumer.unsubscribe(); - log.info("{} - Triggering exit of application!", id); - new Thread(closeCallback).start(); - } - finally - { - log.info("{} - Closing the KafkaConsumer", id); - consumer.close(); - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - } - } - + @KafkaListener(topics = "${juplo.consumer.topic}") private void handleRecord( + @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, + @Header(KafkaHeaders.RECEIVED_PARTITION) Integer partition, + @Header(KafkaHeaders.OFFSET) Long offset, + @Header(KafkaHeaders.RECEIVED_KEY) String key, + @Payload String value) { consumed++; log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value); } - - - public void shutdown() throws InterruptedException - { - log.info("{} joining the worker-thread...", id); - running = false; - consumer.wakeup(); - workerThread.join(); - } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 7a06731..71dddda 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,11 +1,6 @@ juplo: - bootstrap-server: :9092 - client-id: DEV consumer: - group-id: my-group topic: test - auto-offset-reset: earliest - auto-commit-interval: 5s management: endpoint: shutdown: @@ -21,13 +16,17 @@ management: enabled: true info: kafka: - bootstrap-server: ${juplo.bootstrap-server} - client-id: ${juplo.client-id} + bootstrap-server: ${spring.kafka.bootstrap-servers} + client-id: ${spring.kafka.client-id} + group-id: ${spring.kafka.consumer.group-id} + topic: ${simple.consumer.topic} + auto-offset-reset: ${spring.kafka.consumer.auto-offset-reset} +spring: + kafka: + bootstrap-servers: :9092 + client-id: DEV consumer: - group-id: ${juplo.consumer.group-id} - topic: ${juplo.consumer.topic} - auto-offset-reset: ${juplo.consumer.auto-offset-reset} - auto-commit-interval: ${juplo.consumer.auto-commit-interval} + group-id: my-group logging: level: root: INFO