9fcaa6e1592fd009dbd5bad4f2597f47da1a19c7
[demos/kafka/training] / src / main / java / de / juplo / kafka / SimpleProducer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.KafkaProducer;
5 import org.apache.kafka.clients.producer.ProducerRecord;
6 import org.apache.kafka.common.serialization.StringSerializer;
7
8 import java.util.Properties;
9
10
11 @Slf4j
12 public class SimpleProducer
13 {
14   private final String id;
15   private final String topic;
16   private final KafkaProducer<String, String> producer;
17
18   private long produced = 0;
19   private volatile boolean running = true;
20   private volatile boolean done = false;
21
22   public SimpleProducer(String broker, String topic, String clientId)
23   {
24     Properties props = new Properties();
25     props.put("bootstrap.servers", broker);
26     props.put("client.id", clientId); // Nur zur Wiedererkennung
27     props.put("key.serializer", StringSerializer.class.getName());
28     props.put("value.serializer", StringSerializer.class.getName());
29
30     producer = new KafkaProducer<>(props);
31
32     this.topic = topic;
33     this.id = clientId;
34   }
35
36   public void run()
37   {
38     long i = 0;
39
40     try
41     {
42       for (; running ; i++)
43       {
44         send(Long.toString(i%10), Long.toString(i));
45         Thread.sleep(500);
46       }
47     }
48     catch (InterruptedException e) {}
49     finally
50     {
51       log.info("{}: Closing the KafkaProducer", id);
52       producer.close();
53       log.info("{}: Produced {} messages in total, exiting!", id, produced);
54       done = true;
55     }
56   }
57
58   void send(String key, String value)
59   {
60     final long time = System.currentTimeMillis();
61
62     final ProducerRecord<String, String> record = new ProducerRecord<>(
63         topic,  // Topic
64         key,    // Key
65         value   // Value
66     );
67
68     producer.send(record, (metadata, e) ->
69     {
70       long now = System.currentTimeMillis();
71       if (e == null)
72       {
73         // HANDLE SUCCESS
74         produced++;
75         log.debug(
76             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
77             id,
78             record.key(),
79             record.value(),
80             metadata.partition(),
81             metadata.offset(),
82             metadata.timestamp(),
83             now - time
84         );
85       }
86       else
87       {
88         // HANDLE ERROR
89         log.error(
90             "{} - ERROR key={} timestamp={} latency={}ms: {}",
91             id,
92             record.key(),
93             metadata == null ? -1 : metadata.timestamp(),
94             now - time,
95             e.toString()
96         );
97       }
98     });
99
100     long now = System.currentTimeMillis();
101     log.trace(
102         "{} - Queued #{} key={} latency={}ms",
103         id,
104         value,
105         record.key(),
106         now - time
107     );
108   }
109
110
111   public static void main(String[] args) throws Exception
112   {
113     String broker = ":9092";
114     String topic = "test";
115     String clientId = "DEV";
116
117     switch (args.length)
118     {
119       case 3:
120         clientId = args[2];
121       case 2:
122         topic = args[1];
123       case 1:
124         broker = args[0];
125     }
126
127     SimpleProducer instance = new SimpleProducer(broker, topic, clientId);
128
129     Runtime.getRuntime().addShutdownHook(new Thread(() ->
130     {
131       instance.running = false;
132       while (!instance.done)
133       {
134         log.info("Waiting for main-thread...");
135         try
136         {
137           Thread.sleep(1000);
138         }
139         catch (InterruptedException e) {}
140       }
141       log.info("Shutdown completed.");
142     }));
143
144     log.info(
145         "Running SimpleProducer: broker={}, topic={}, client-id={}",
146         broker,
147         topic,
148         clientId);
149     instance.run();
150   }
151 }