]> juplo.de Git - demos/kafka/training/commitdiff
Konfigurierbar
authorKai Moritz <kai@juplo.de>
Tue, 1 Apr 2025 18:09:44 +0000 (20:09 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 22 Mar 2026 14:43:23 +0000 (15:43 +0100)
src/main/java/de/juplo/kafka/ExampleProducer.java

index 914a163b1536a6b471137a223b0a4d8273c8ae39..080102dc96c85d1c66eed6020f708e26d3b05709 100644 (file)
@@ -12,33 +12,39 @@ import java.util.Properties;
 @Slf4j
 public class ExampleProducer
 {
+  private final String id;
+  private final String topic;
+  private final Producer<String, String> producer;
+
   private volatile boolean running = true;
   private volatile boolean done = false;
   private long produced = 0;
 
-  public static void main(String[] args) throws Exception
+  public ExampleProducer(
+    String broker,
+    String topic,
+    String clientId)
   {
-    String broker = "localhost:9092";
-    String topic = "test";
-    String clientId = "DEV";
-
     Properties props = new Properties();
     props.put("bootstrap.servers", broker);
     props.put("client.id", clientId); // Nur zur Wiedererkennung
     props.put("key.serializer", StringSerializer.class.getName());
     props.put("value.serializer", StringSerializer.class.getName());
 
-    Producer<String, String> producer = new KafkaProducer<>(props);
-
-    String id = clientId;
+    this.id = clientId;
+    this.topic = topic;
+    producer = new KafkaProducer<>(props);
+  }
 
+  public void run() throws Exception
+  {
     try
     {
-      for (long i = 0; true; i++)
+      for (long i = 0; running; i++)
       {
         final ProducerRecord<String, String> record = new ProducerRecord<>(
           topic,                 // Topic
-          Long.toString(i%10), // Key
+          Long.toString(i % 10), // Key
           Long.toString(i)       // Value
         );
 
@@ -52,6 +58,34 @@ public class ExampleProducer
     finally
     {
       log.info("{}: Exiting!", id);
+      done = true;
+    }
+  }
+
+  public static void main(String[] args) throws Exception
+  {
+    if (args.length != 3)
+    {
+      System.exit(1);
     }
+
+    ExampleProducer instance = new ExampleProducer(args[0], args[1], args[2]);
+
+    Runtime.getRuntime().addShutdownHook(new Thread(() ->
+    {
+      instance.running = false;
+      while (!instance.done)
+      {
+        log.info("Waiting...");
+        try
+        {
+          Thread.sleep(1000);
+        }
+        catch (InterruptedException e) {}
+      }
+      log.info("DONE!");
+    }));
+
+    instance.run();
   }
 }