Vorlage & Setup für Fire-n-Forget-Übung überarbeitet
[demos/kafka/training] / src / main / java / de / juplo / kafka / SimpleProducer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.Producer;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerRecord;
7 import org.apache.kafka.common.serialization.StringSerializer;
8
9 import java.util.Properties;
10
11
12 @Slf4j
13 public class SimpleProducer
14 {
15   private final String id;
16   private final String topic;
17   private final Producer<String, String> producer;
18
19   private long produced = 0;
20
21   public SimpleProducer(String broker, String topic, String clientId)
22   {
23     Properties props = new Properties();
24     props.put("bootstrap.servers", broker);
25     props.put("client.id", clientId); // Nur zur Wiedererkennung
26     props.put("key.serializer", StringSerializer.class.getName());
27     props.put("value.serializer", StringSerializer.class.getName());
28
29     this.id = clientId;
30     this.topic = topic;
31     producer = new KafkaProducer<>(props);
32   }
33
34   public void run()
35   {
36     try
37     {
38       for (int i = 0; i < 10000 ; i++)
39       {
40         send(Long.toString(i%10), Long.toString(i));
41       }
42     }
43     finally
44     {
45       log.info("{}: Produced {} messages in total, exiting!", id, produced);
46     }
47   }
48
49   void send(String key, String value)
50   {
51     final long time = System.currentTimeMillis();
52
53     final ProducerRecord<String, String> record = new ProducerRecord<>(
54         topic,  // Topic
55         key,    // Key
56         value   // Value
57     );
58
59     producer.send(record, (metadata, e) ->
60     {
61       long now = System.currentTimeMillis();
62       if (e == null)
63       {
64         // HANDLE SUCCESS
65         produced++;
66         log.debug(
67             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
68             id,
69             record.key(),
70             record.value(),
71             metadata.partition(),
72             metadata.offset(),
73             metadata.timestamp(),
74             now - time
75         );
76       }
77       else
78       {
79         // HANDLE ERROR
80         log.error(
81             "{} - ERROR key={} timestamp={} latency={}ms: {}",
82             id,
83             record.key(),
84             metadata == null ? -1 : metadata.timestamp(),
85             now - time,
86             e.toString()
87         );
88       }
89     });
90
91     long now = System.currentTimeMillis();
92     log.trace(
93         "{} - Queued #{} key={} latency={}ms",
94         id,
95         value,
96         record.key(),
97         now - time
98     );
99   }
100
101
102   public static void main(String[] args) throws Exception
103   {
104     String broker = ":9092";
105     String topic = "test";
106     String clientId = "DEV";
107
108     switch (args.length)
109     {
110       case 3:
111         clientId = args[2];
112       case 2:
113         topic = args[1];
114       case 1:
115         broker = args[0];
116     }
117
118     SimpleProducer instance = new SimpleProducer(broker, topic, clientId);
119
120     log.info(
121         "Running SimpleProducer: broker={}, topic={}, client-id={}",
122         broker,
123         topic,
124         clientId);
125     instance.run();
126   }
127 }