From: Kai Moritz Date: Sun, 15 Mar 2026 14:47:41 +0000 (+0100) Subject: Batching zeitnah versendeter Nachrichten ermöglicht X-Git-Tag: grundlagen/simple-producer--completablefuture--2026-03--vor-branchumbenennung--springframework~1 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=6433115ccdf36a4c00b5b7e20c50b6c279581b2f;p=demos%2Fkafka%2Ftraining Batching zeitnah versendeter Nachrichten ermöglicht * Durch den Aufruf der blockierenden Methode `get()` wurde erzwungen, das der Versand einer Nachricht von Kafka bestätigt wurde, bevor die nächste Nachricht an Kafka übergeben werden konnte. * Der blockierende Aufruf wurde jetzt in eine getrennte asynchrone Verarbeitung (in dem Default-Thread-Pool) verschoben. * Dadurch können jetzt mehrere für den Versand bereitgestellte Nachrichten hintereinander an Kafka übergeben werden. * _Beachte:_ Der Effekt wird erst dann wirklich deutlich, wenn das Throttling über `Thread.sleep(500)` deaktiviert wird! * _Beachte:_ Da die Verarbeitung der Rückmeldung von Kafka über den Default-Thread-Pool erfolgt, erfolgt die Erzeugung der Log-Ausgaben asynchron in verschiedenen Threads. D.h., die Log-Meldungen werden _nicht_ in der Reihenfolge ausgegeben, in der die Nachrichten versendet wurden, was auf den ersten Blick sehr verwirrend ist! --- diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index ca5ba678..3a5f94af 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -81,26 +81,30 @@ public class ExampleProducer semaphore.acquire(); CompletableFuture completableFuture = CompletableFuture .supplyAsync(() -> + { + Future recordMetadataFuture = producer.send(record); + long sendRequestQueued = System.currentTimeMillis(); + semaphore.release(); + log.trace( + "{} - Queued message {}={}, latency={}ms", + id, + key, + value, + sendRequestQueued - sendRequested + ); + return recordMetadataFuture; + }, executor) + .thenApplyAsync(recordMetadataFuture -> { try { - Future result = producer.send(record); - long sendRequestQueued = System.currentTimeMillis(); - semaphore.release(); - log.trace( - "{} - Queued message {}={}, latency={}ms", - id, - key, - value, - sendRequestQueued - sendRequested - ); - return result.get(); + return recordMetadataFuture.get(); } catch (Exception e) { throw new RuntimeException(e); } - }, executor); + }); completableFuture.whenComplete((metadata, e) -> {