]> juplo.de Git - demos/kafka/training/commitdiff
TMP
authorKai Moritz <kai@juplo.de>
Wed, 25 Sep 2024 23:03:26 +0000 (01:03 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 26 Sep 2024 08:43:05 +0000 (10:43 +0200)
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ExampleProducer.java
src/main/resources/application.yml

index d2699457f0940689e93fd96ba109eeaedfe72cc8..aba77099983e86a4269e3775cf4191134f2b0fc6 100644 (file)
@@ -22,8 +22,6 @@ public class Application implements ApplicationRunner
   @Autowired
   ThreadPoolTaskExecutor taskExecutor;
   @Autowired
-  Producer<?, ?> kafkaProducer;
-  @Autowired
   ExampleProducer exampleProducer;
   @Autowired
   ConfigurableApplicationContext context;
@@ -31,33 +29,23 @@ public class Application implements ApplicationRunner
   ListenableFuture<Integer> consumerJob;
 
   @Override
-  public void run(ApplicationArguments args) throws Exception
+  public void run(ApplicationArguments args)
   {
-    log.info("Starting SimpleConsumer");
+    log.info("Starting ExampleProducer");
     consumerJob = taskExecutor.submitListenable(exampleProducer);
     consumerJob.addCallback(
       exitStatus ->
       {
-        log.info("SimpleConsumer exited normally, exit-status: {}", exitStatus);
+        log.info("ExampleProducer exited normally, exit-status: {}", exitStatus);
         SpringApplication.exit(context, () -> exitStatus);
         },
       t ->
       {
-        log.error("SimpleConsumer exited abnormally!", t);
+        log.error("ExampleProducer exited abnormally!", t);
         SpringApplication.exit(context, () -> 2);
       });
   }
 
-  @PreDestroy
-  public void shutdown() throws ExecutionException, InterruptedException
-  {
-    log.info("Signaling ExampleProducer to quit its work");
-    exampleProducer.shutdown();
-    log.info("Waiting for ExampleProducer to finish its work");
-    consumerJob.get();
-    log.info("ExampleProducer finished its work");
-  }
-
 
   public static void main(String[] args)
   {
index 6bd5cd5b0ec82804914333a3e4576c0a22f344c4..2ecf1e7b1e46df943c60361158f4f6674174aa61 100644 (file)
@@ -1,32 +1,24 @@
 package de.juplo.kafka;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ApplicationListener;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.context.event.ContextClosedEvent;
 
 import java.util.Properties;
 
 
 @Configuration
 @EnableConfigurationProperties(ApplicationProperties.class)
+@Slf4j
 public class ApplicationConfiguration
 {
   @Bean
-  public ExampleProducer exampleProducer(
-      ApplicationProperties properties,
-      KafkaProducer<String, String> kafkaProducer)
-  {
-    return
-        new ExampleProducer(
-            properties.getClientId(),
-            properties.getTopic(),
-            kafkaProducer);
-  }
-
-  @Bean(destroyMethod = "close")
-  public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
+  public ExampleProducer exampleProducer(ApplicationProperties properties)
   {
     Properties props = new Properties();
     props.put("bootstrap.servers", properties.getBootstrapServer());
@@ -40,6 +32,19 @@ public class ApplicationConfiguration
     props.put("key.serializer", StringSerializer.class.getName());
     props.put("value.serializer", StringSerializer.class.getName());
 
-    return new KafkaProducer<>(props);
+    return
+        new ExampleProducer(
+            properties.getClientId(),
+            properties.getTopic(),
+          new KafkaProducer<>(props));
+  }
+
+  @Bean
+  public ApplicationListener<ContextClosedEvent> shutdownHandler(ExampleProducer exampleProducer)
+  {
+    return event -> {
+      log.info("Shutdown is in progress... Signalling ExampleProducer to quit its work!");
+      exampleProducer.shutdown();
+    };
   }
 }
index d1f1bf94a600f63df5fce67753110f89b805e427..826e1acfa392d786599236e495e806d5d6bb9349 100644 (file)
@@ -38,8 +38,13 @@ public class ExampleProducer implements Callable<Integer>
       log.error("{} - Unexpected error: {}! Produced {} messages", id, e.toString(), produced);
       return 1;
     }
+    finally
+    {
+      log.info("{}: Closing the KafkaProducer", id);
+      producer.close();
+      log.info("{}: Produced {} messages in total, exiting!", id, produced);
+    }
 
-    log.info("{}: Produced {} messages in total, exiting!", id, produced);
     return 0;
   }
 
index 726204e177f4d87eeee1cfec7cbcb02ea6d9a739..1afbe1cf820df5d273759b493dc0d3d058c519e7 100644 (file)
@@ -30,7 +30,6 @@ info:
     compression-type: ${producer.compression-type}
 logging:
   level:
-    root: INFO
-    de.juplo: DEBUG
+    root: TRACE
 server:
   port: 8880