]> juplo.de Git - demos/kafka/training/commitdiff
Die begrenzte Queue mit Blocking wird nicht mehr benötigt
authorKai Moritz <kai@juplo.de>
Sun, 15 Mar 2026 15:55:13 +0000 (16:55 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 22 Mar 2026 20:02:05 +0000 (21:02 +0100)
src/main/java/de/juplo/kafka/ExampleProducer.java

index b4dfebb9ea17b40ca50e29e6d3e3868f34523a90..24add8f83aae3291616cd62fc02b6435f2a1ac74 100644 (file)
@@ -20,14 +20,7 @@ public class ExampleProducer
   private final String topic;
   private final Producer<String, String> producer;
 
-  private final ExecutorService executor = new ThreadPoolExecutor(
-    1,
-    1,
-    60L,
-    TimeUnit.MILLISECONDS,
-    new ArrayBlockingQueue<>(MAX_PENDING_MESSAGES),
-    new BlockingRejectedExecutionHandler()
-  );
+  private final ExecutorService executor = Executors.newSingleThreadExecutor();
   private final Semaphore semaphore = new Semaphore(MAX_PENDING_MESSAGES);
   private volatile boolean running = true;
   private volatile boolean done = false;
@@ -189,22 +182,4 @@ public class ExampleProducer
 
     instance.run();
   }
-
-
-  static class BlockingRejectedExecutionHandler implements RejectedExecutionHandler
-  {
-
-    @Override
-    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor)
-    {
-      try
-      {
-        executor.getQueue().put(runnable);   // blockiert bis Platz frei ist
-      }
-      catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RejectedExecutionException(e);
-      }
-    }
-  }
 }