Nicht benötigte Imports entfernt
[demos/kafka/training] / src / main / java / de / juplo / kafka / SimpleProducer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.Producer;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerRecord;
7 import org.apache.kafka.common.serialization.StringSerializer;
8
9 import java.util.Properties;
10
11
12 @Slf4j
13 public class SimpleProducer
14 {
15   private final String id;
16   private final String topic;
17   private final Producer<String, String> producer;
18
19   private long produced = 0;
20
21   public SimpleProducer(String clientId, String topic)
22   {
23     // tag::create[]
24     Properties props = new Properties();
25     props.put("bootstrap.servers", "localhost:9092");
26     props.put("key.serializer", StringSerializer.class.getName());
27     props.put("value.serializer", StringSerializer.class.getName());
28
29     Producer<String, String> producer = new KafkaProducer<>(props);
30     // end::create[]
31
32     this.id = clientId;
33     this.topic = topic;
34     this.producer = producer;
35   }
36
37   public void run()
38   {
39     long i = 0;
40
41     try
42     {
43       for (; i < 100 ; i++)
44       {
45         send(Long.toString(i%10), Long.toString(i));
46       }
47
48       log.info("{} - Done", id);
49     }
50     finally
51     {
52       log.info("{}: Closing the KafkaProducer", id);
53       producer.close();
54       log.info("{}: Produced {} messages in total, exiting!", id, produced);
55     }
56   }
57
58   void send(String key, String value)
59   {
60     final long time = System.currentTimeMillis();
61
62     final ProducerRecord<String, String> record = new ProducerRecord<>(
63         topic,  // Topic
64         key,    // Key
65         value   // Value
66     );
67
68     producer.send(record, (metadata, e) ->
69     {
70       long now = System.currentTimeMillis();
71       if (e == null)
72       {
73         // HANDLE SUCCESS
74         produced++;
75         log.debug(
76             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
77             id,
78             record.key(),
79             record.value(),
80             metadata.partition(),
81             metadata.offset(),
82             metadata.timestamp(),
83             now - time
84         );
85       }
86       else
87       {
88         // HANDLE ERROR
89         log.error(
90             "{} - ERROR key={} timestamp={} latency={}ms: {}",
91             id,
92             record.key(),
93             metadata == null ? -1 : metadata.timestamp(),
94             now - time,
95             e.toString()
96         );
97       }
98     });
99
100     long now = System.currentTimeMillis();
101     log.trace(
102         "{} - Queued #{} key={} latency={}ms",
103         id,
104         value,
105         record.key(),
106         now - time
107     );
108   }
109
110
111   public static void main(String[] args) throws Exception
112   {
113     SimpleProducer producer = new SimpleProducer("P", "test");
114     producer.run();
115   }
116 }