From f3f5275dd9fdf6743a589b8574738c05a00e774f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 1 Apr 2025 20:09:44 +0200 Subject: [PATCH] Konfigurierbar --- .../java/de/juplo/kafka/ExampleProducer.java | 54 +++++++++++++++---- 1 file changed, 44 insertions(+), 10 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index 914a163b..080102dc 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -12,33 +12,39 @@ import java.util.Properties; @Slf4j public class ExampleProducer { + private final String id; + private final String topic; + private final Producer producer; + private volatile boolean running = true; private volatile boolean done = false; private long produced = 0; - public static void main(String[] args) throws Exception + public ExampleProducer( + String broker, + String topic, + String clientId) { - String broker = "localhost:9092"; - String topic = "test"; - String clientId = "DEV"; - Properties props = new Properties(); props.put("bootstrap.servers", broker); props.put("client.id", clientId); // Nur zur Wiedererkennung props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); - Producer producer = new KafkaProducer<>(props); - - String id = clientId; + this.id = clientId; + this.topic = topic; + producer = new KafkaProducer<>(props); + } + public void run() throws Exception + { try { - for (long i = 0; true; i++) + for (long i = 0; running; i++) { final ProducerRecord record = new ProducerRecord<>( topic, // Topic - Long.toString(i%10), // Key + Long.toString(i % 10), // Key Long.toString(i) // Value ); @@ -52,6 +58,34 @@ public class ExampleProducer finally { log.info("{}: Exiting!", id); + done = true; + } + } + + public static void main(String[] args) throws Exception + { + if (args.length != 3) + { + System.exit(1); } + + ExampleProducer instance = new ExampleProducer(args[0], args[1], args[2]); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> + { + instance.running = false; + while (!instance.done) + { + log.info("Waiting..."); + try + { + Thread.sleep(1000); + } + catch (InterruptedException e) {} + } + log.info("DONE!"); + })); + + instance.run(); } } -- 2.39.5