357e07a93e4de5b5b1c80e29076a7d4ba7c128f4
[demos/kafka/training] / src / main / java / de / juplo / kafka / SimpleProducer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.KafkaProducer;
5 import org.apache.kafka.clients.producer.ProducerRecord;
6 import org.apache.kafka.common.serialization.StringSerializer;
7
8 import java.util.Properties;
9 import java.util.concurrent.ExecutorService;
10 import java.util.concurrent.Future;
11
12
13 @Slf4j
14 public class SimpleProducer
15 {
16   private final String id;
17   private final String topic;
18   private final KafkaProducer<String, String> producer;
19
20   private long produced = 0;
21
22   public SimpleProducer(String clientId, String topic)
23   {
24     // tag::create[]
25     Properties props = new Properties();
26     props.put("bootstrap.servers", "localhost:9092");
27     props.put("key.serializer", StringSerializer.class.getName());
28     props.put("value.serializer", StringSerializer.class.getName());
29
30     KafkaProducer<String, String> producer = new KafkaProducer<>(props);
31     // end::create[]
32
33     this.id = clientId;
34     this.topic = topic;
35     this.producer = producer;
36   }
37
38   public void run()
39   {
40     long i = 0;
41
42     try
43     {
44       for (; i < 100 ; i++)
45       {
46         send(Long.toString(i%10), Long.toString(i));
47       }
48
49       log.info("{} - Done", id);
50     }
51     finally
52     {
53       log.info("{}: Closing the KafkaProducer", id);
54       producer.close();
55       log.info("{}: Produced {} messages in total, exiting!", id, produced);
56     }
57   }
58
59   void send(String key, String value)
60   {
61     final long time = System.currentTimeMillis();
62
63     final ProducerRecord<String, String> record = new ProducerRecord<>(
64         "test", // Topic
65         key,    // Key
66         value   // Value
67     );
68
69     producer.send(record, (metadata, e) ->
70     {
71       long now = System.currentTimeMillis();
72       if (e == null)
73       {
74         // HANDLE SUCCESS
75         produced++;
76         log.debug(
77             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
78             id,
79             record.key(),
80             record.value(),
81             metadata.partition(),
82             metadata.offset(),
83             metadata.timestamp(),
84             now - time
85         );
86       }
87       else
88       {
89         // HANDLE ERROR
90         log.error(
91             "{} - ERROR key={} timestamp={} latency={}ms: {}",
92             id,
93             record.key(),
94             metadata == null ? -1 : metadata.timestamp(),
95             now - time,
96             e.toString()
97         );
98       }
99     });
100
101     long now = System.currentTimeMillis();
102     log.trace(
103         "{} - Queued #{} key={} latency={}ms",
104         id,
105         value,
106         record.key(),
107         now - time
108     );
109   }
110
111
112   public static void main(String[] args) throws Exception
113   {
114     SimpleProducer producer = new SimpleProducer("P", "test");
115     producer.run();
116   }
117 }