Vorlage
[demos/kafka/training] / src / main / java / de / juplo / kafka / SimpleProducer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.Producer;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerRecord;
7 import org.apache.kafka.common.serialization.StringSerializer;
8
9 import java.util.Properties;
10
11
12 @Slf4j
13 public class SimpleProducer
14 {
15   private final String id;
16   private final String topic;
17   private final Producer<String, String> producer;
18
19   private long produced = 0;
20   private volatile boolean running = true;
21   private volatile boolean done = false;
22
23   public SimpleProducer(String broker, String topic, String clientId)
24   {
25     Properties props = new Properties();
26     // TODO: Konfiguration für den Producer zusammenstellen
27
28     this.id = clientId;
29     this.topic = topic;
30     producer = new KafkaProducer<>(props);
31   }
32
33   public void run()
34   {
35     long i = 0;
36
37     try
38     {
39       for (; running ; i++)
40       {
41         send(Long.toString(i%10), Long.toString(i));
42         Thread.sleep(500);
43       }
44     }
45     catch (InterruptedException e) {}
46     finally
47     {
48       log.info("{}: Closing the KafkaProducer", id);
49       producer.close();
50       log.info("{}: Produced {} messages in total, exiting!", id, produced);
51       done = true;
52     }
53   }
54
55   void send(String key, String value)
56   {
57     final long time = System.currentTimeMillis();
58
59     ProducerRecord<String, String> record = null;
60     // TODO:
61     //
62     // 1. ProducerRecord<String, String> erzeugen
63     // 2. Record mit producer.send(...) verschicken
64     // Dabei beachten:
65     // * Callback implementieren und Erfolg/Fehler logge
66     // * Tipp: Aktuellen Zeitstempel auch im Callback erzeugen und
67     //   Millisekunden berechnen, die bis zum Senden verstrichen sind:
68     //   long now = System.currentTimeMillis(); // Für: now - time
69
70     long now = System.currentTimeMillis();
71     log.trace(
72         "{} - Queued #{} key={} latency={}ms",
73         id,
74         value,
75         record.key(),
76         now - time
77     );
78   }
79
80
81   public static void main(String[] args) throws Exception
82   {
83     String broker = ":9092";
84     String topic = "test";
85     String clientId = "DEV";
86
87     switch (args.length)
88     {
89       case 3:
90         clientId = args[2];
91       case 2:
92         topic = args[1];
93       case 1:
94         broker = args[0];
95     }
96
97     SimpleProducer instance = new SimpleProducer(broker, topic, clientId);
98
99     Runtime.getRuntime().addShutdownHook(new Thread(() ->
100     {
101       instance.running = false;
102       while (!instance.done)
103       {
104         log.info("Waiting for main-thread...");
105         try
106         {
107           Thread.sleep(1000);
108         }
109         catch (InterruptedException e) {}
110       }
111       log.info("Shutdown completed.");
112     }));
113
114     log.info(
115         "Running SimpleProducer: broker={}, topic={}, client-id={}",
116         broker,
117         topic,
118         clientId);
119     instance.run();
120   }
121 }