]> juplo.de Git - demos/kafka/training/commitdiff
Batching zeitnah versendeter Nachrichten ermöglicht
authorKai Moritz <kai@juplo.de>
Sun, 15 Mar 2026 14:47:41 +0000 (15:47 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 22 Mar 2026 20:02:05 +0000 (21:02 +0100)
* Durch den Aufruf der blockierenden Methode `get()` wurde erzwungen, das
  der Versand einer Nachricht von Kafka bestätigt wurde, bevor die
  nächste Nachricht an Kafka übergeben werden konnte.
* Der blockierende Aufruf wurde jetzt in eine getrennte asynchrone
  Verarbeitung (in dem Default-Thread-Pool) verschoben.
* Dadurch können jetzt mehrere für den Versand bereitgestellte Nachrichten
  hintereinander an Kafka übergeben werden.
* _Beachte:_ Der Effekt wird erst dann wirklich deutlich, wenn das
  Throttling über `Thread.sleep(500)` deaktiviert wird!
* _Beachte:_ Da die Verarbeitung der Rückmeldung von Kafka über den
  Default-Thread-Pool erfolgt, erfolgt die Erzeugung der Log-Ausgaben
  asynchron in verschiedenen Threads. D.h., die Log-Meldungen werden
  _nicht_ in der Reihenfolge ausgegeben, in der die Nachrichten versendet
  wurden, was auf den ersten Blick sehr verwirrend ist!

src/main/java/de/juplo/kafka/ExampleProducer.java

index 24add8f83aae3291616cd62fc02b6435f2a1ac74..4497d9bdadc53e218a36e91060251828ba26c999 100644 (file)
@@ -81,26 +81,30 @@ public class ExampleProducer
     semaphore.acquire();
     CompletableFuture<RecordMetadata> completableFuture = CompletableFuture
       .supplyAsync(() ->
+      {
+        Future<RecordMetadata> recordMetadataFuture = producer.send(record);
+        long sendRequestQueued = System.currentTimeMillis();
+        semaphore.release();
+        log.trace(
+          "{} - Queued message {}={}, latency={}ms",
+          id,
+          key,
+          value,
+          sendRequestQueued - sendRequested
+        );
+        return recordMetadataFuture;
+      }, executor)
+      .thenApplyAsync(recordMetadataFuture ->
       {
         try
         {
-          Future<RecordMetadata> result = producer.send(record);
-          long sendRequestQueued = System.currentTimeMillis();
-          semaphore.release();
-          log.trace(
-            "{} - Queued message {}={}, latency={}ms",
-            id,
-            key,
-            value,
-            sendRequestQueued - sendRequested
-          );
-          return result.get();
+          return recordMetadataFuture.get();
         }
         catch (Exception e)
         {
           throw new RuntimeException(e);
         }
-      }, executor);
+      });
 
     completableFuture.whenComplete((metadata, e) ->
     {