From 47c922d074f0b7382222f650723d44a72b5b2e5f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 15 Mar 2026 16:45:23 +0100 Subject: [PATCH] Fehler korrigiert: Erhalt der Sendereihenfolge trotz `supplyAsync()` MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Für die Ausführung wird jetzt ein Executor mit nur einem Thread verwendet * Dadurch ist sichergestellt, dass die Reihenfolge erhalten bleibt --- .../java/de/juplo/kafka/ExampleProducer.java | 33 +++++++++++++++++-- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index 14b7cbc1..ffdb0e99 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -8,17 +8,26 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; +import java.util.concurrent.*; @Slf4j public class ExampleProducer { + public final static int MAX_PENDING_MESSAGES = 100; + private final String id; private final String topic; private final Producer producer; + private final ExecutorService executor = new ThreadPoolExecutor( + 1, + 1, + 60L, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(MAX_PENDING_MESSAGES), + new BlockingRejectedExecutionHandler() + ); private volatile boolean running = true; private volatile boolean done = false; private long produced = 0; @@ -94,7 +103,7 @@ public class ExampleProducer { throw new RuntimeException(e); } - }); + }, executor); completableFuture.whenComplete((metadata, e) -> { @@ -176,4 +185,22 @@ public class ExampleProducer instance.run(); } + + + static class BlockingRejectedExecutionHandler implements RejectedExecutionHandler + { + + @Override + public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) + { + try + { + executor.getQueue().put(runnable); // blockiert bis Platz frei ist + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RejectedExecutionException(e); + } + } + } } -- 2.39.5