Beispiele verwenden das Interface, um die erzeugte Instanz abzulegen
[demos/kafka/training] / src / main / java / de / juplo / kafka / SimpleProducer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.Producer;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerRecord;
7 import org.apache.kafka.common.serialization.StringSerializer;
8
9 import java.util.Properties;
10 import java.util.concurrent.ExecutorService;
11 import java.util.concurrent.Future;
12
13
14 @Slf4j
15 public class SimpleProducer
16 {
17   private final String id;
18   private final String topic;
19   private final Producer<String, String> producer;
20
21   private long produced = 0;
22
23   public SimpleProducer(String clientId, String topic)
24   {
25     // tag::create[]
26     Properties props = new Properties();
27     props.put("bootstrap.servers", "localhost:9092");
28     props.put("key.serializer", StringSerializer.class.getName());
29     props.put("value.serializer", StringSerializer.class.getName());
30
31     Producer<String, String> producer = new KafkaProducer<>(props);
32     // end::create[]
33
34     this.id = clientId;
35     this.topic = topic;
36     this.producer = producer;
37   }
38
39   public void run()
40   {
41     long i = 0;
42
43     try
44     {
45       for (; i < 100 ; i++)
46       {
47         send(Long.toString(i%10), Long.toString(i));
48       }
49
50       log.info("{} - Done", id);
51     }
52     finally
53     {
54       log.info("{}: Closing the KafkaProducer", id);
55       producer.close();
56       log.info("{}: Produced {} messages in total, exiting!", id, produced);
57     }
58   }
59
60   void send(String key, String value)
61   {
62     final long time = System.currentTimeMillis();
63
64     final ProducerRecord<String, String> record = new ProducerRecord<>(
65         topic,  // Topic
66         key,    // Key
67         value   // Value
68     );
69
70     producer.send(record, (metadata, e) ->
71     {
72       long now = System.currentTimeMillis();
73       if (e == null)
74       {
75         // HANDLE SUCCESS
76         produced++;
77         log.debug(
78             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
79             id,
80             record.key(),
81             record.value(),
82             metadata.partition(),
83             metadata.offset(),
84             metadata.timestamp(),
85             now - time
86         );
87       }
88       else
89       {
90         // HANDLE ERROR
91         log.error(
92             "{} - ERROR key={} timestamp={} latency={}ms: {}",
93             id,
94             record.key(),
95             metadata == null ? -1 : metadata.timestamp(),
96             now - time,
97             e.toString()
98         );
99       }
100     });
101
102     long now = System.currentTimeMillis();
103     log.trace(
104         "{} - Queued #{} key={} latency={}ms",
105         id,
106         value,
107         record.key(),
108         now - time
109     );
110   }
111
112
113   public static void main(String[] args) throws Exception
114   {
115     SimpleProducer producer = new SimpleProducer("P", "test");
116     producer.run();
117   }
118 }