private final String topic;
private final Producer<String, String> producer;
- private final ExecutorService executor = new ThreadPoolExecutor(
- 1,
- 1,
- 60L,
- TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<>(MAX_PENDING_MESSAGES),
- new BlockingRejectedExecutionHandler()
- );
+ private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final Semaphore semaphore = new Semaphore(MAX_PENDING_MESSAGES);
private volatile boolean running = true;
private volatile boolean done = false;
instance.run();
}
-
-
- static class BlockingRejectedExecutionHandler implements RejectedExecutionHandler
- {
-
- @Override
- public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor)
- {
- try
- {
- executor.getQueue().put(runnable); // blockiert bis Platz frei ist
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RejectedExecutionException(e);
- }
- }
- }
}