TMP
authorKai Moritz <kai@juplo.de>
Wed, 25 Sep 2024 23:03:26 +0000 (01:03 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 26 Sep 2024 08:43:05 +0000 (10:43 +0200)
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ExampleProducer.java
src/main/resources/application.yml

index d269945..aba7709 100644 (file)
@@ -22,8 +22,6 @@ public class Application implements ApplicationRunner
   @Autowired
   ThreadPoolTaskExecutor taskExecutor;
   @Autowired
-  Producer<?, ?> kafkaProducer;
-  @Autowired
   ExampleProducer exampleProducer;
   @Autowired
   ConfigurableApplicationContext context;
@@ -31,33 +29,23 @@ public class Application implements ApplicationRunner
   ListenableFuture<Integer> consumerJob;
 
   @Override
-  public void run(ApplicationArguments args) throws Exception
+  public void run(ApplicationArguments args)
   {
-    log.info("Starting SimpleConsumer");
+    log.info("Starting ExampleProducer");
     consumerJob = taskExecutor.submitListenable(exampleProducer);
     consumerJob.addCallback(
       exitStatus ->
       {
-        log.info("SimpleConsumer exited normally, exit-status: {}", exitStatus);
+        log.info("ExampleProducer exited normally, exit-status: {}", exitStatus);
         SpringApplication.exit(context, () -> exitStatus);
         },
       t ->
       {
-        log.error("SimpleConsumer exited abnormally!", t);
+        log.error("ExampleProducer exited abnormally!", t);
         SpringApplication.exit(context, () -> 2);
       });
   }
 
-  @PreDestroy
-  public void shutdown() throws ExecutionException, InterruptedException
-  {
-    log.info("Signaling ExampleProducer to quit its work");
-    exampleProducer.shutdown();
-    log.info("Waiting for ExampleProducer to finish its work");
-    consumerJob.get();
-    log.info("ExampleProducer finished its work");
-  }
-
 
   public static void main(String[] args)
   {
index 6bd5cd5..2ecf1e7 100644 (file)
@@ -1,32 +1,24 @@
 package de.juplo.kafka;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ApplicationListener;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.context.event.ContextClosedEvent;
 
 import java.util.Properties;
 
 
 @Configuration
 @EnableConfigurationProperties(ApplicationProperties.class)
+@Slf4j
 public class ApplicationConfiguration
 {
   @Bean
-  public ExampleProducer exampleProducer(
-      ApplicationProperties properties,
-      KafkaProducer<String, String> kafkaProducer)
-  {
-    return
-        new ExampleProducer(
-            properties.getClientId(),
-            properties.getTopic(),
-            kafkaProducer);
-  }
-
-  @Bean(destroyMethod = "close")
-  public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
+  public ExampleProducer exampleProducer(ApplicationProperties properties)
   {
     Properties props = new Properties();
     props.put("bootstrap.servers", properties.getBootstrapServer());
@@ -40,6 +32,19 @@ public class ApplicationConfiguration
     props.put("key.serializer", StringSerializer.class.getName());
     props.put("value.serializer", StringSerializer.class.getName());
 
-    return new KafkaProducer<>(props);
+    return
+        new ExampleProducer(
+            properties.getClientId(),
+            properties.getTopic(),
+          new KafkaProducer<>(props));
+  }
+
+  @Bean
+  public ApplicationListener<ContextClosedEvent> shutdownHandler(ExampleProducer exampleProducer)
+  {
+    return event -> {
+      log.info("Shutdown is in progress... Signalling ExampleProducer to quit its work!");
+      exampleProducer.shutdown();
+    };
   }
 }
index d1f1bf9..826e1ac 100644 (file)
@@ -38,8 +38,13 @@ public class ExampleProducer implements Callable<Integer>
       log.error("{} - Unexpected error: {}! Produced {} messages", id, e.toString(), produced);
       return 1;
     }
+    finally
+    {
+      log.info("{}: Closing the KafkaProducer", id);
+      producer.close();
+      log.info("{}: Produced {} messages in total, exiting!", id, produced);
+    }
 
-    log.info("{}: Produced {} messages in total, exiting!", id, produced);
     return 0;
   }
 
index 726204e..1afbe1c 100644 (file)
@@ -30,7 +30,6 @@ info:
     compression-type: ${producer.compression-type}
 logging:
   level:
-    root: INFO
-    de.juplo: DEBUG
+    root: TRACE
 server:
   port: 8880