5f5792581d860a8b119e5e561250929bc8f35319
[demos/kafka/training] / src / main / java / de / juplo / kafka / SimpleProducer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.KafkaProducer;
5 import org.apache.kafka.clients.producer.ProducerRecord;
6 import org.apache.kafka.common.serialization.StringSerializer;
7
8 import java.util.Properties;
9
10
11 @Slf4j
12 public class SimpleProducer
13 {
14   public static void main(String[] args) throws Exception
15   {
16     // tag::create[]
17     Properties props = new Properties();
18     props.put("bootstrap.servers", "localhost:9092");
19     props.put("key.serializer", StringSerializer.class.getName());
20     props.put("value.serializer", StringSerializer.class.getName());
21
22     KafkaProducer<String, String> producer = new KafkaProducer<>(props);
23     // end::create[]
24
25     String id = "P";
26     long i = 0;
27
28     try
29     {
30       for (; i < 100 ; i++)
31       {
32         final long time = System.currentTimeMillis();
33
34         final ProducerRecord<String, String> record = new ProducerRecord<>(
35             "test",              // Topic
36             Long.toString(i%10), // Key
37             Long.toString(i)     // Value
38         );
39
40         producer.send(record, (metadata, e) ->
41         {
42           long now = System.currentTimeMillis();
43           if (e == null)
44           {
45             // HANDLE SUCCESS
46             log.debug(
47                 "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
48                 id,
49                 record.key(),
50                 record.value(),
51                 metadata.partition(),
52                 metadata.offset(),
53                 metadata.timestamp(),
54                 now - time
55             );
56           }
57           else
58           {
59             // HANDLE ERROR
60             log.error(
61                 "{} - ERROR key={} timestamp={} latency={}ms: {}",
62                 id,
63                 record.key(),
64                 metadata == null ? -1 : metadata.timestamp(),
65                 now - time,
66                 e.toString()
67             );
68           }
69         });
70
71         long now = System.currentTimeMillis();
72         log.trace(
73             "{} - Queued #{} key={} latency={}ms",
74             id,
75             i,
76             record.key(),
77             now - time
78         );
79       }
80     }
81     finally
82     {
83       log.info("{}: Closing the KafkaProducer", id);
84       producer.close();
85       log.info("{}: Exiting!", id);
86     }
87   }
88 }