private final Producer<String, String> producer;
private long produced = 0;
- private volatile boolean running = true;
- private volatile boolean done = false;
public SimpleProducer(String broker, String topic, String clientId)
{
public void run()
{
- long i = 0;
-
try
{
- for (; running ; i++)
+ for (int i = 0; i < 10000 ; i++)
{
send(Long.toString(i%10), Long.toString(i));
- Thread.sleep(500);
}
}
- catch (InterruptedException e) {}
finally
{
- log.info("{}: Closing the KafkaProducer", id);
- producer.close();
log.info("{}: Produced {} messages in total, exiting!", id, produced);
- done = true;
}
}
SimpleProducer instance = new SimpleProducer(broker, topic, clientId);
- Runtime.getRuntime().addShutdownHook(new Thread(() ->
- {
- instance.running = false;
- while (!instance.done)
- {
- log.info("Waiting for main-thread...");
- try
- {
- Thread.sleep(1000);
- }
- catch (InterruptedException e) {}
- }
- log.info("Shutdown completed.");
- }));
-
log.info(
"Running SimpleProducer: broker={}, topic={}, client-id={}",
broker,