From 6433115ccdf36a4c00b5b7e20c50b6c279581b2f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 15 Mar 2026 15:47:41 +0100 Subject: [PATCH] =?utf8?q?Batching=20zeitnah=20versendeter=20Nachrichten?= =?utf8?q?=20erm=C3=B6glicht?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Durch den Aufruf der blockierenden Methode `get()` wurde erzwungen, das der Versand einer Nachricht von Kafka bestätigt wurde, bevor die nächste Nachricht an Kafka übergeben werden konnte. * Der blockierende Aufruf wurde jetzt in eine getrennte asynchrone Verarbeitung (in dem Default-Thread-Pool) verschoben. * Dadurch können jetzt mehrere für den Versand bereitgestellte Nachrichten hintereinander an Kafka übergeben werden. * _Beachte:_ Der Effekt wird erst dann wirklich deutlich, wenn das Throttling über `Thread.sleep(500)` deaktiviert wird! * _Beachte:_ Da die Verarbeitung der Rückmeldung von Kafka über den Default-Thread-Pool erfolgt, erfolgt die Erzeugung der Log-Ausgaben asynchron in verschiedenen Threads. D.h., die Log-Meldungen werden _nicht_ in der Reihenfolge ausgegeben, in der die Nachrichten versendet wurden, was auf den ersten Blick sehr verwirrend ist! --- .../java/de/juplo/kafka/ExampleProducer.java | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index ca5ba678..3a5f94af 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -81,26 +81,30 @@ public class ExampleProducer semaphore.acquire(); CompletableFuture completableFuture = CompletableFuture .supplyAsync(() -> + { + Future recordMetadataFuture = producer.send(record); + long sendRequestQueued = System.currentTimeMillis(); + semaphore.release(); + log.trace( + "{} - Queued message {}={}, latency={}ms", + id, + key, + value, + sendRequestQueued - sendRequested + ); + return recordMetadataFuture; + }, executor) + .thenApplyAsync(recordMetadataFuture -> { try { - Future result = producer.send(record); - long sendRequestQueued = System.currentTimeMillis(); - semaphore.release(); - log.trace( - "{} - Queued message {}={}, latency={}ms", - id, - key, - value, - sendRequestQueued - sendRequested - ); - return result.get(); + return recordMetadataFuture.get(); } catch (Exception e) { throw new RuntimeException(e); } - }, executor); + }); completableFuture.whenComplete((metadata, e) -> { -- 2.39.5