@Slf4j
public class ExampleProducer
{
+ private final String id;
+ private final String topic;
+ private final Producer<String, String> producer;
+
private volatile boolean running = true;
private volatile boolean done = false;
private long produced = 0;
- public static void main(String[] args) throws Exception
+ public ExampleProducer(
+ String broker,
+ String topic,
+ String clientId)
{
- String broker = "localhost:9092";
- String topic = "test";
- String clientId = "DEV";
-
Properties props = new Properties();
props.put("bootstrap.servers", broker);
props.put("client.id", clientId); // Nur zur Wiedererkennung
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
- Producer<String, String> producer = new KafkaProducer<>(props);
-
- String id = clientId;
+ this.id = clientId;
+ this.topic = topic;
+ producer = new KafkaProducer<>(props);
+ }
+ public void run() throws Exception
+ {
try
{
- for (long i = 0; true; i++)
+ for (long i = 0; running; i++)
{
final ProducerRecord<String, String> record = new ProducerRecord<>(
topic, // Topic
- Long.toString(i%10), // Key
+ Long.toString(i % 10), // Key
Long.toString(i) // Value
);
finally
{
log.info("{}: Exiting!", id);
+ done = true;
+ }
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ if (args.length != 3)
+ {
+ System.exit(1);
}
+
+ ExampleProducer instance = new ExampleProducer(args[0], args[1], args[2]);
+
+ Runtime.getRuntime().addShutdownHook(new Thread(() ->
+ {
+ instance.running = false;
+ while (!instance.done)
+ {
+ log.info("Waiting...");
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e) {}
+ }
+ log.info("DONE!");
+ }));
+
+ instance.run();
}
}