import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;
@Slf4j
public class ExampleProducer
{
+ public final static int MAX_PENDING_MESSAGES = 100;
+
private final String id;
private final String topic;
private final Producer<String, String> producer;
+ private final ExecutorService executor = new ThreadPoolExecutor(
+ 1,
+ 1,
+ 60L,
+ TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<>(MAX_PENDING_MESSAGES),
+ new BlockingRejectedExecutionHandler()
+ );
private volatile boolean running = true;
private volatile boolean done = false;
private long produced = 0;
{
throw new RuntimeException(e);
}
- });
+ }, executor);
completableFuture.whenComplete((metadata, e) ->
{
instance.run();
}
+
+
+ static class BlockingRejectedExecutionHandler implements RejectedExecutionHandler
+ {
+
+ @Override
+ public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor)
+ {
+ try
+ {
+ executor.getQueue().put(runnable); // blockiert bis Platz frei ist
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RejectedExecutionException(e);
+ }
+ }
+ }
}