]> juplo.de Git - demos/kafka/training/commitdiff
Fix: `close()` muss noch vom `ExampleProducer` aufgerufen werden
authorKai Moritz <kai@juplo.de>
Fri, 1 Nov 2024 14:12:00 +0000 (15:12 +0100)
committerKai Moritz <kai@juplo.de>
Tue, 21 Jan 2025 20:13:37 +0000 (21:13 +0100)
* Der Aufruf von `close()` löst das Versenden wartender Nachrichten aus.
* Wenn die Methode erst von Spring aufgerufen wird, werden ggf. noch
  Nachrichten konsumiert, nachdem der `ExampleProducer` bereits
  ausgegeben hat, wieviele Nachrichten er insgesamt verarbeitet hat.

src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ExampleProducer.java

index 3d775f086cf321b72b7f49b9a1df57ef22d6615a..5a9bd82f85ab71fb1592a6deb21c853fc7502a92 100644 (file)
@@ -33,7 +33,7 @@ public class ApplicationConfiguration
             () -> applicationContext.close());
   }
 
-  @Bean
+  @Bean(destroyMethod = "")
   public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
   {
     Properties props = new Properties();
index 4e167ce01b5bb74f366026c641f84a4c545c9230..3fcd05d181979b64349080dd8b8c8b7f45f75e73 100644 (file)
@@ -72,6 +72,8 @@ public class ExampleProducer implements Runnable
     }
     finally
     {
+      log.info("{}: Closing the KafkaProducer", id);
+      producer.close();
       log.info("{}: Produced {} messages in total, exiting!", id, produced);
     }
   }