]> juplo.de Git - demos/kafka/training/commitdiff
Fehler korrigiert: Erhalt der Sendereihenfolge trotz `supplyAsync()`
authorKai Moritz <kai@juplo.de>
Sun, 15 Mar 2026 15:45:23 +0000 (16:45 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 15 Mar 2026 15:48:38 +0000 (16:48 +0100)
* Für die Ausführung wird jetzt ein Executor mit nur einem Thread verwendet
* Dadurch ist sichergestellt, dass die Reihenfolge erhalten bleibt

src/main/java/de/juplo/kafka/ExampleProducer.java

index cb7ba1b1e4e323dd166286abd0fc24add55b8f37..df804c934c643ffe66cc23bf9ce61102b90338e8 100644 (file)
@@ -8,17 +8,26 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;
 
 
 @Slf4j
 public class ExampleProducer
 {
+  public final static int MAX_PENDING_MESSAGES = 100;
+
   private final String id;
   private final String topic;
   private final Producer<String, String> producer;
 
+  private final ExecutorService executor = new ThreadPoolExecutor(
+    1,
+    1,
+    60L,
+    TimeUnit.MILLISECONDS,
+    new ArrayBlockingQueue<>(MAX_PENDING_MESSAGES),
+    new BlockingRejectedExecutionHandler()
+  );
   private volatile boolean running = true;
   private volatile boolean done = false;
   private long produced = 0;
@@ -94,7 +103,7 @@ public class ExampleProducer
         {
           throw new RuntimeException(e);
         }
-      });
+      }, executor);
 
     completableFuture.whenComplete((metadata, e) ->
     {
@@ -176,4 +185,22 @@ public class ExampleProducer
 
     instance.run();
   }
+
+
+  static class BlockingRejectedExecutionHandler implements RejectedExecutionHandler
+  {
+
+    @Override
+    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor)
+    {
+      try
+      {
+        executor.getQueue().put(runnable);   // blockiert bis Platz frei ist
+      }
+      catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RejectedExecutionException(e);
+      }
+    }
+  }
 }