]> juplo.de Git - demos/kafka/training/commitdiff
Fehler korrigiert: Shutdown wartet auf Versand der wartenden Nachrichten
authorKai Moritz <kai@juplo.de>
Sun, 15 Mar 2026 10:48:08 +0000 (11:48 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 15 Mar 2026 15:51:53 +0000 (16:51 +0100)
* Über die Verwendung einer Semaphore wird sichergestellt, dass alle
  für das Queuing übergebenen Nachrichten auch an Kafka übergeben werden,
  bevor `KafkaProducer.close()` aufgerufen wird
* Ansonsten wurden geschedulte Nachrichten teilweise erst an den Producer
  übergeben, nachdem dieser bereits geschlossen wurde, so dass es zu
  Fehlern gekommen ist.

src/main/java/de/juplo/kafka/ExampleProducer.java

index df804c934c643ffe66cc23bf9ce61102b90338e8..06d5b8135d1faf3718ef3eea5f5b9469fd559509 100644 (file)
@@ -28,6 +28,7 @@ public class ExampleProducer
     new ArrayBlockingQueue<>(MAX_PENDING_MESSAGES),
     new BlockingRejectedExecutionHandler()
   );
+  private final Semaphore semaphore = new Semaphore(MAX_PENDING_MESSAGES);
   private volatile boolean running = true;
   private volatile boolean done = false;
   private long produced = 0;
@@ -59,6 +60,7 @@ public class ExampleProducer
         send(Long.toString(i%10), Long.toString(i));
         Thread.sleep(500);
       }
+      semaphore.acquire(MAX_PENDING_MESSAGES);
     }
     catch (Exception e)
     {
@@ -73,7 +75,7 @@ public class ExampleProducer
     }
   }
 
-  void send(String key, String value)
+  void send(String key, String value) throws InterruptedException
   {
     final long sendRequested = System.currentTimeMillis();
 
@@ -83,6 +85,7 @@ public class ExampleProducer
       value   // Value
     );
 
+    semaphore.acquire();
     CompletableFuture<RecordMetadata> completableFuture = CompletableFuture
       .supplyAsync(() ->
       {
@@ -90,6 +93,7 @@ public class ExampleProducer
         {
           Future<RecordMetadata> result = producer.send(record);
           long sendRequestQueued = System.currentTimeMillis();
+          semaphore.release();
           log.trace(
             "{} - Queued message {}={}, latency={}ms",
             id,