From 17b25ab1fb29e6445a25dbc604f261322bdda0ec Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 12 Nov 2022 17:10:53 +0100 Subject: [PATCH] `@KafkaListener`-Version des `spring-consumer` --- README.sh | 14 +--- docker-compose.yml | 12 ++- pom.xml | 2 +- src/main/java/de/juplo/kafka/Application.java | 42 +--------- .../juplo/kafka/ApplicationConfiguration.java | 34 --------- .../java/de/juplo/kafka/SimpleConsumer.java | 76 +++++-------------- 6 files changed, 37 insertions(+), 143 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/ApplicationConfiguration.java diff --git a/README.sh b/README.sh index 53d1aa7..c0a65f0 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.0-SNAPSHOT +IMAGE=juplo/spring-consumer-kafkalistener:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -25,12 +25,6 @@ else fi docker-compose up setup -docker-compose up -d producer - -mvn spring-boot:run & -sleep 10 -kill $(jobs -p) -mvn spring-boot:run & -sleep 10 -docker-compose stop producer -kill $(jobs -p) +docker-compose up -d consumer +docker-compose up producer +docker-compose logs consumer diff --git a/docker-compose.yml b/docker-compose.yml index bd15793..9ed194b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -103,5 +103,13 @@ services: command: sleep infinity producer: - image: juplo/simple-producer:1.0-SNAPSHOT - command: producer + image: juplo/supersimple-producer:1.0-SNAPSHOT + environment: + spring.kafka.bootstrap-servers: kafka:9092 + + consumer: + image: juplo/spring-consumer-kafkalistener:1.0-SNAPSHOT + environment: + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.client-id: consumer + diff --git a/pom.xml b/pom.xml index cdfb199..e28ae92 100644 --- a/pom.xml +++ b/pom.xml @@ -12,7 +12,7 @@ de.juplo.kafka - spring-consumer + spring-consumer-kafkalistener 1.0-SNAPSHOT Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 3828b1d..273cee5 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -1,50 +1,14 @@ package de.juplo.kafka; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; - -import javax.annotation.PreDestroy; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; +import org.springframework.boot.context.properties.EnableConfigurationProperties; @SpringBootApplication -@Slf4j -public class Application implements ApplicationRunner +@EnableConfigurationProperties(ApplicationProperties.class) +public class Application { - @Autowired - ThreadPoolTaskExecutor taskExecutor; - @Autowired - Consumer kafkaConsumer; - @Autowired - SimpleConsumer simpleConsumer; - - Future consumerJob; - - @Override - public void run(ApplicationArguments args) throws Exception - { - log.info("Starting SimpleConsumer"); - consumerJob = taskExecutor.submit(simpleConsumer); - } - - @PreDestroy - public void shutdown() throws ExecutionException, InterruptedException - { - log.info("Signaling SimpleConsumer to quit its work"); - kafkaConsumer.wakeup(); - log.info("Waiting for SimpleConsumer to finish its work"); - consumerJob.get(); - log.info("SimpleConsumer finished its work"); - } - - public static void main(String[] args) { SpringApplication.run(Application.class, args); diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java deleted file mode 100644 index 62d61a2..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ /dev/null @@ -1,34 +0,0 @@ -package de.juplo.kafka; - -import org.apache.kafka.clients.consumer.Consumer; -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -import org.springframework.kafka.core.ConsumerFactory; - - -@Configuration -@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class }) -public class ApplicationConfiguration -{ - @Bean - public SimpleConsumer simpleConsumer( - Consumer kafkaConsumer, - KafkaProperties kafkaProperties, - ApplicationProperties applicationProperties) - { - return - new SimpleConsumer( - kafkaProperties.getClientId(), - applicationProperties.getTopic(), - kafkaConsumer); - } - - @Bean - public Consumer kafkaConsumer(ConsumerFactory factory) - { - return factory.createConsumer(); - } -} diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java index 1cf9b22..31c7e6b 100644 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ b/src/main/java/de/juplo/kafka/SimpleConsumer.java @@ -1,74 +1,36 @@ package de.juplo.kafka; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.errors.WakeupException; - -import java.time.Duration; -import java.util.Arrays; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Component; @Slf4j -@RequiredArgsConstructor -public class SimpleConsumer implements Runnable +@Component +public class SimpleConsumer { - private final String id; - private final String topic; - private final Consumer consumer; - + @Value("${spring.kafka.client-id}") + private String id; private long consumed = 0; - - @Override - public void run() - { - try - { - log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); - - while (true) - { - ConsumerRecords records = - consumer.poll(Duration.ofSeconds(1)); - - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); - } - } - } - catch(WakeupException e) - { - log.info("{} - Consumer was signaled to finish its work", id); - } - catch(Exception e) - { - log.error("{} - Unexpected error: {}, unsubscribing!", id, e.toString()); - consumer.unsubscribe(); - } - finally - { - log.info("{} - Closing the KafkaConsumer", id); - consumer.close(); - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - } - } - + @KafkaListener( + id = "${spring.kafka.client-id}", + groupId = "${spring.kafka.consumer.group-id}", + topics = "${simple.consumer.topic}") private void handleRecord( + @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, + @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, + @Header(KafkaHeaders.OFFSET) Long offset, + @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, + @Payload String value) { consumed++; -- 2.20.1