From: Kai Moritz Date: Wed, 25 Sep 2024 23:03:26 +0000 (+0200) Subject: TMP X-Git-Tag: producer/spring-producer--Shutdown-Hängt~1 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=c4d30b707dcaee0f538b58412faeab1941c639df;p=demos%2Fkafka%2Ftraining TMP --- diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index d269945..aba7709 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -22,8 +22,6 @@ public class Application implements ApplicationRunner @Autowired ThreadPoolTaskExecutor taskExecutor; @Autowired - Producer kafkaProducer; - @Autowired ExampleProducer exampleProducer; @Autowired ConfigurableApplicationContext context; @@ -31,33 +29,23 @@ public class Application implements ApplicationRunner ListenableFuture consumerJob; @Override - public void run(ApplicationArguments args) throws Exception + public void run(ApplicationArguments args) { - log.info("Starting SimpleConsumer"); + log.info("Starting ExampleProducer"); consumerJob = taskExecutor.submitListenable(exampleProducer); consumerJob.addCallback( exitStatus -> { - log.info("SimpleConsumer exited normally, exit-status: {}", exitStatus); + log.info("ExampleProducer exited normally, exit-status: {}", exitStatus); SpringApplication.exit(context, () -> exitStatus); }, t -> { - log.error("SimpleConsumer exited abnormally!", t); + log.error("ExampleProducer exited abnormally!", t); SpringApplication.exit(context, () -> 2); }); } - @PreDestroy - public void shutdown() throws ExecutionException, InterruptedException - { - log.info("Signaling ExampleProducer to quit its work"); - exampleProducer.shutdown(); - log.info("Waiting for ExampleProducer to finish its work"); - consumerJob.get(); - log.info("ExampleProducer finished its work"); - } - public static void main(String[] args) { diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 6bd5cd5..2ecf1e7 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,32 +1,24 @@ package de.juplo.kafka; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ApplicationListener; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.event.ContextClosedEvent; import java.util.Properties; @Configuration @EnableConfigurationProperties(ApplicationProperties.class) +@Slf4j public class ApplicationConfiguration { @Bean - public ExampleProducer exampleProducer( - ApplicationProperties properties, - KafkaProducer kafkaProducer) - { - return - new ExampleProducer( - properties.getClientId(), - properties.getTopic(), - kafkaProducer); - } - - @Bean(destroyMethod = "close") - public KafkaProducer kafkaProducer(ApplicationProperties properties) + public ExampleProducer exampleProducer(ApplicationProperties properties) { Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); @@ -40,6 +32,19 @@ public class ApplicationConfiguration props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); - return new KafkaProducer<>(props); + return + new ExampleProducer( + properties.getClientId(), + properties.getTopic(), + new KafkaProducer<>(props)); + } + + @Bean + public ApplicationListener shutdownHandler(ExampleProducer exampleProducer) + { + return event -> { + log.info("Shutdown is in progress... Signalling ExampleProducer to quit its work!"); + exampleProducer.shutdown(); + }; } } diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index d1f1bf9..826e1ac 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -38,8 +38,13 @@ public class ExampleProducer implements Callable log.error("{} - Unexpected error: {}! Produced {} messages", id, e.toString(), produced); return 1; } + finally + { + log.info("{}: Closing the KafkaProducer", id); + producer.close(); + log.info("{}: Produced {} messages in total, exiting!", id, produced); + } - log.info("{}: Produced {} messages in total, exiting!", id, produced); return 0; } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 726204e..1afbe1c 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -30,7 +30,6 @@ info: compression-type: ${producer.compression-type} logging: level: - root: INFO - de.juplo: DEBUG + root: TRACE server: port: 8880