Konfigurierbar
authorKai Moritz <kai@juplo.de>
Tue, 1 Apr 2025 18:09:44 +0000 (20:09 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 1 Apr 2025 18:11:04 +0000 (20:11 +0200)
src/main/java/de/juplo/kafka/ExampleProducer.java

index f899fdd..73aa8e9 100644 (file)
@@ -20,11 +20,13 @@ public class ExampleProducer
   private volatile boolean done = false;
   private long produced = 0;
 
-  public static void main(String[] args) throws Exception
+  public ExampleProducer(
+    String broker,
+    String topic,
+    String clientId)
   {
-    String broker = "localhost:9092";
-    String topic = "test";
-    String clientId = "DEV";
+    this.topic = topic;
+    this.id = clientId;
 
     Properties props = new Properties();
     props.put("bootstrap.servers", broker);
@@ -32,13 +34,12 @@ public class ExampleProducer
     props.put("key.serializer", StringSerializer.class.getName());
     props.put("value.serializer", StringSerializer.class.getName());
 
-    Producer<String, String> producer = new KafkaProducer<>(props);
+    producer = new KafkaProducer<>(props);
+  }
 
-    String id = clientId;
+  void run() throws Exception
+  {
     long i = 0;
-    boolean running = true;
-    boolean done = false;
-    long produced = 0;
 
     try
     {
@@ -51,12 +52,50 @@ public class ExampleProducer
         );
 
         producer.send(record);
+        produced++;
+        log.info("{}: Send message {}", id, i);
         Thread.sleep(500);
       }
     }
     finally
     {
+      done = true;
       log.info("{}: Produced {} messages in total, exiting!", id, produced);
     }
   }
+
+  public static void main(String[] args) throws Exception
+  {
+    String broker = "localhost:9092";
+    String topic = "test";
+    String clientId = "DEV";
+
+    switch(args.length)
+    {
+      default:
+        throw new RuntimeException("3 Argumente!");
+      case 3:
+        clientId = args[2];
+      case 2:
+        topic = args[1];
+      case 1:
+        broker = args[0];
+    }
+
+    ExampleProducer instance = new ExampleProducer(broker, topic, clientId);
+
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+      instance.running = false;
+      while(!instance.done) {
+        try {
+          Thread.sleep(10);
+        }
+        catch (Exception e) {}
+        log.info("Waiting...");
+      }
+      log.info("DONE!");
+    }));
+
+    instance.run();
+  }
 }