From 146a609dccc535dd91e8eeecbd13a9a09d09fad3 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 19 Nov 2022 18:29:25 +0100 Subject: [PATCH] `@KafkaListener`-Version des `spring-consumer` mit Business-Logik MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Das README.sh führt beide Fehler vor. * Der Logik-Fehler wird zuerst vorgeführt, da die Poison-Pill dazu führt, dass sich der betroffene Consumer in einer endlosschleife verfängt. * Das README.sh enthält alle Tag's für die kompletten Übungen zu der Fehlerbehandlung in Spring Kafka. --- README.sh | 18 +++-- docker-compose.yml | 4 +- pom.xml | 2 +- .../de/juplo/kafka/AdderBusinessLogic.java | 3 + src/main/java/de/juplo/kafka/Application.java | 56 +------------ .../juplo/kafka/ApplicationConfiguration.java | 52 ------------- .../java/de/juplo/kafka/MessageHandler.java | 13 ++-- .../java/de/juplo/kafka/SimpleConsumer.java | 78 +++++-------------- src/main/resources/application.yml | 1 + 9 files changed, 49 insertions(+), 178 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/ApplicationConfiguration.java diff --git a/README.sh b/README.sh index 872116f..14685bb 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer-json:1.0-SNAPSHOT +IMAGE=juplo/spring-consumer-json-kafkalistener:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -39,11 +39,17 @@ echo "Writing logic error..." # tag::logicerror[] echo 66 | http -v :8080/peter?error=1 # end::logicerror[] + +echo 66 | http -v :8080/peter echo 7 | http -v :8080/klaus -docker-compose logs -f consumer-1 consumer-2 +sleep 5 -echo "Restarting consumer-1..." -# tag::restart[] -docker-compose up consumer-1 -# end::restart[] +echo "Writing poison pill..." +# tag::poisonpill[] +echo 'BOOM!' | kafkacat -P -b :9092 -t test +# end::poisonpill[] + +sleep 5 +docker-compose stop consumer-1 consumer-2 +docker-compose logs -f consumer-1 consumer-2 diff --git a/docker-compose.yml b/docker-compose.yml index f6dbef4..3b6a145 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -113,7 +113,7 @@ services: producer.topic: test consumer-1: - image: juplo/spring-consumer-json:1.0-SNAPSHOT + image: juplo/spring-consumer-json-kafkalistener:1.0-SNAPSHOT ports: - 8081:8080 environment: @@ -124,7 +124,7 @@ services: simple.consumer.topic: test consumer-2: - image: juplo/spring-consumer-json:1.0-SNAPSHOT + image: juplo/spring-consumer-json-kafkalistener:1.0-SNAPSHOT ports: - 8082:8080 environment: diff --git a/pom.xml b/pom.xml index ebf6eb1..24eb90b 100644 --- a/pom.xml +++ b/pom.xml @@ -12,7 +12,7 @@ de.juplo.kafka - spring-consumer-json + spring-consumer-json-kafkalistener 1.0-SNAPSHOT Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka diff --git a/src/main/java/de/juplo/kafka/AdderBusinessLogic.java b/src/main/java/de/juplo/kafka/AdderBusinessLogic.java index c0c19a5..70118c2 100644 --- a/src/main/java/de/juplo/kafka/AdderBusinessLogic.java +++ b/src/main/java/de/juplo/kafka/AdderBusinessLogic.java @@ -1,11 +1,14 @@ package de.juplo.kafka; +import org.springframework.stereotype.Component; + import java.util.HashMap; import java.util.Map; import java.util.Optional; +@Component public class AdderBusinessLogic { private final Map state; diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 04dc343..273cee5 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -1,64 +1,14 @@ package de.juplo.kafka; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.util.concurrent.ListenableFuture; - -import javax.annotation.PreDestroy; -import java.util.concurrent.ExecutionException; +import org.springframework.boot.context.properties.EnableConfigurationProperties; @SpringBootApplication -@Slf4j -public class Application implements ApplicationRunner +@EnableConfigurationProperties(ApplicationProperties.class) +public class Application { - @Autowired - ThreadPoolTaskExecutor taskExecutor; - @Autowired - Consumer kafkaConsumer; - @Autowired - SimpleConsumer simpleConsumer; - @Autowired - ConfigurableApplicationContext context; - - ListenableFuture consumerJob; - - @Override - public void run(ApplicationArguments args) throws Exception - { - log.info("Starting SimpleConsumer"); - consumerJob = taskExecutor.submitListenable(simpleConsumer); - consumerJob.addCallback( - exitStatus -> - { - log.info("SimpleConsumer exited normally, exit-status: {}", exitStatus); - SpringApplication.exit(context, () -> exitStatus); - }, - t -> - { - log.error("SimpleConsumer exited abnormally!", t); - SpringApplication.exit(context, () -> 2); - }); - } - - @PreDestroy - public void shutdown() throws ExecutionException, InterruptedException - { - log.info("Signaling SimpleConsumer to quit its work"); - kafkaConsumer.wakeup(); - log.info("Waiting for SimpleConsumer to finish its work"); - consumerJob.get(); - log.info("SimpleConsumer finished its work"); - } - - public static void main(String[] args) { SpringApplication.run(Application.class, args); diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java deleted file mode 100644 index d292dbc..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ /dev/null @@ -1,52 +0,0 @@ -package de.juplo.kafka; - -import org.apache.kafka.clients.consumer.Consumer; -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -import org.springframework.kafka.core.ConsumerFactory; - - -@Configuration -@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class }) -public class ApplicationConfiguration -{ - @Bean - public AdderBusinessLogic adder() - { - return new AdderBusinessLogic(); - } - - @Bean - public MessageHandler messageHandler( - KafkaProperties properties, - AdderBusinessLogic adder) - { - return new MessageHandler( - properties.getClientId(), - adder); - } - - @Bean - public SimpleConsumer simpleConsumer( - Consumer kafkaConsumer, - MessageHandler messageHandler, - KafkaProperties kafkaProperties, - ApplicationProperties applicationProperties) - { - return - new SimpleConsumer( - kafkaProperties.getClientId(), - applicationProperties.getTopic(), - kafkaConsumer, - messageHandler); - } - - @Bean - public Consumer kafkaConsumer(ConsumerFactory factory) - { - return factory.createConsumer(); - } -} diff --git a/src/main/java/de/juplo/kafka/MessageHandler.java b/src/main/java/de/juplo/kafka/MessageHandler.java index 2f58f65..efd3f80 100644 --- a/src/main/java/de/juplo/kafka/MessageHandler.java +++ b/src/main/java/de/juplo/kafka/MessageHandler.java @@ -1,16 +1,19 @@ package de.juplo.kafka; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; -@RequiredArgsConstructor @Slf4j +@Component public class MessageHandler { - private final String id; - - private final AdderBusinessLogic adder; + @Value("${spring.kafka.client-id}") + private String id; + @Autowired + private AdderBusinessLogic adder; public void addNumber( diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java index cea9568..cc7effb 100644 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ b/src/main/java/de/juplo/kafka/SimpleConsumer.java @@ -1,78 +1,38 @@ package de.juplo.kafka; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.errors.WakeupException; - -import java.time.Duration; -import java.util.Arrays; -import java.util.concurrent.Callable; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Component; @Slf4j -@RequiredArgsConstructor -public class SimpleConsumer implements Callable +@Component +public class SimpleConsumer { - private final String id; - private final String topic; - private final Consumer consumer; - private final MessageHandler messageHandler; + @Value("${spring.kafka.client-id}") + private String id; + @Autowired + private MessageHandler messageHandler; private long consumed = 0; - @Override - public Integer call() - { - try - { - log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); - - while (true) - { - ConsumerRecords records = - consumer.poll(Duration.ofSeconds(1)); - - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); - } - } - } - catch(WakeupException e) - { - log.info("{} - Consumer was signaled to finish its work", id); - return 0; - } - catch(Exception e) - { - log.error("{} - Unexpected error: {}, unsubscribing!", id, e.toString()); - consumer.unsubscribe(); - return 1; - } - finally - { - log.info("{} - Closing the KafkaConsumer", id); - consumer.close(); - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - } - } - + @KafkaListener(topics = "${simple.consumer.topic}") private void handleRecord( + @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, + @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, + @Header(KafkaHeaders.OFFSET) Long offset, + @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, + @Payload Message value) { consumed++; diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 07d0625..3afdd2f 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -41,5 +41,6 @@ logging: level: root: INFO de.juplo: DEBUG + org.springframework.kafka: DEBUG server: port: 8881 -- 2.20.1