private volatile boolean done = false;
private long produced = 0;
- public static void main(String[] args) throws Exception
+ public ExampleProducer(
+ String broker,
+ String topic,
+ String clientId)
{
- String broker = "localhost:9092";
- String topic = "test";
- String clientId = "DEV";
+ this.topic = topic;
+ this.id = clientId;
Properties props = new Properties();
props.put("bootstrap.servers", broker);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
- Producer<String, String> producer = new KafkaProducer<>(props);
+ producer = new KafkaProducer<>(props);
+ }
- String id = clientId;
+ void run() throws Exception
+ {
long i = 0;
- boolean running = true;
- boolean done = false;
- long produced = 0;
try
{
);
producer.send(record);
+ produced++;
+ log.info("{}: Send message {}", id, i);
Thread.sleep(500);
}
}
finally
{
+ done = true;
log.info("{}: Produced {} messages in total, exiting!", id, produced);
}
}
+
+ public static void main(String[] args) throws Exception
+ {
+ String broker = "localhost:9092";
+ String topic = "test";
+ String clientId = "DEV";
+
+ switch(args.length)
+ {
+ default:
+ throw new RuntimeException("3 Argumente!");
+ case 3:
+ clientId = args[2];
+ case 2:
+ topic = args[1];
+ case 1:
+ broker = args[0];
+ }
+
+ ExampleProducer instance = new ExampleProducer(broker, topic, clientId);
+
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ instance.running = false;
+ while(!instance.done) {
+ try {
+ Thread.sleep(10);
+ }
+ catch (Exception e) {}
+ log.info("Waiting...");
+ }
+ log.info("DONE!");
+ }));
+
+ instance.run();
+ }
}