From 9c34e38a45ba5ae28f73f2829f0a30aeec85db97 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 1 Apr 2025 20:09:44 +0200 Subject: [PATCH] Konfigurierbar --- .../java/de/juplo/kafka/ExampleProducer.java | 57 ++++++++++++++++--- 1 file changed, 48 insertions(+), 9 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index f899fdd..73aa8e9 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -20,11 +20,13 @@ public class ExampleProducer private volatile boolean done = false; private long produced = 0; - public static void main(String[] args) throws Exception + public ExampleProducer( + String broker, + String topic, + String clientId) { - String broker = "localhost:9092"; - String topic = "test"; - String clientId = "DEV"; + this.topic = topic; + this.id = clientId; Properties props = new Properties(); props.put("bootstrap.servers", broker); @@ -32,13 +34,12 @@ public class ExampleProducer props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); - Producer producer = new KafkaProducer<>(props); + producer = new KafkaProducer<>(props); + } - String id = clientId; + void run() throws Exception + { long i = 0; - boolean running = true; - boolean done = false; - long produced = 0; try { @@ -51,12 +52,50 @@ public class ExampleProducer ); producer.send(record); + produced++; + log.info("{}: Send message {}", id, i); Thread.sleep(500); } } finally { + done = true; log.info("{}: Produced {} messages in total, exiting!", id, produced); } } + + public static void main(String[] args) throws Exception + { + String broker = "localhost:9092"; + String topic = "test"; + String clientId = "DEV"; + + switch(args.length) + { + default: + throw new RuntimeException("3 Argumente!"); + case 3: + clientId = args[2]; + case 2: + topic = args[1]; + case 1: + broker = args[0]; + } + + ExampleProducer instance = new ExampleProducer(broker, topic, clientId); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + instance.running = false; + while(!instance.done) { + try { + Thread.sleep(10); + } + catch (Exception e) {} + log.info("Waiting..."); + } + log.info("DONE!"); + })); + + instance.run(); + } } -- 2.20.1