From 1d1ff9b136ca80e520d8bd4c646ef89c0dbf20d9 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 15 Mar 2026 11:48:08 +0100 Subject: [PATCH] Fehler korrigiert: Shutdown wartet auf Versand der wartenden Nachrichten MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Über die Verwendung einer Semaphore wird sichergestellt, dass alle für das Queuing übergebenen Nachrichten auch an Kafka übergeben werden, bevor `KafkaProducer.close()` aufgerufen wird * Ansonsten wurden geschedulte Nachrichten teilweise erst an den Producer übergeben, nachdem dieser bereits geschlossen wurde, so dass es zu Fehlern gekommen ist. --- src/main/java/de/juplo/kafka/ExampleProducer.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index df804c93..06d5b813 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -28,6 +28,7 @@ public class ExampleProducer new ArrayBlockingQueue<>(MAX_PENDING_MESSAGES), new BlockingRejectedExecutionHandler() ); + private final Semaphore semaphore = new Semaphore(MAX_PENDING_MESSAGES); private volatile boolean running = true; private volatile boolean done = false; private long produced = 0; @@ -59,6 +60,7 @@ public class ExampleProducer send(Long.toString(i%10), Long.toString(i)); Thread.sleep(500); } + semaphore.acquire(MAX_PENDING_MESSAGES); } catch (Exception e) { @@ -73,7 +75,7 @@ public class ExampleProducer } } - void send(String key, String value) + void send(String key, String value) throws InterruptedException { final long sendRequested = System.currentTimeMillis(); @@ -83,6 +85,7 @@ public class ExampleProducer value // Value ); + semaphore.acquire(); CompletableFuture completableFuture = CompletableFuture .supplyAsync(() -> { @@ -90,6 +93,7 @@ public class ExampleProducer { Future result = producer.send(record); long sendRequestQueued = System.currentTimeMillis(); + semaphore.release(); log.trace( "{} - Queued message {}={}, latency={}ms", id, -- 2.39.5