Beispiel verwendet das Interface, um die erzeugte Instanz abzulegen
[demos/kafka/training] / src / main / java / de / juplo / kafka / SimpleProducer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.Producer;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerRecord;
7 import org.apache.kafka.common.serialization.StringSerializer;
8
9 import java.util.Properties;
10
11
12 @Slf4j
13 public class SimpleProducer
14 {
15   private final String id;
16   private final String topic;
17   private final Producer<String, String> producer;
18
19   private long produced = 0;
20   private volatile boolean running = true;
21   private volatile boolean done = false;
22
23   public SimpleProducer(String broker, String topic, String clientId)
24   {
25     Properties props = new Properties();
26     props.put("bootstrap.servers", broker);
27     props.put("client.id", clientId); // Nur zur Wiedererkennung
28     props.put("key.serializer", StringSerializer.class.getName());
29     props.put("value.serializer", StringSerializer.class.getName());
30
31     producer = new KafkaProducer<>(props);
32
33     this.topic = topic;
34     this.id = clientId;
35   }
36
37   public void run()
38   {
39     long i = 0;
40
41     try
42     {
43       for (; running ; i++)
44       {
45         send(Long.toString(i%10), Long.toString(i));
46         Thread.sleep(500);
47       }
48     }
49     catch (InterruptedException e) {}
50     finally
51     {
52       log.info("{}: Closing the KafkaProducer", id);
53       producer.close();
54       log.info("{}: Produced {} messages in total, exiting!", id, produced);
55       done = true;
56     }
57   }
58
59   void send(String key, String value)
60   {
61     final long time = System.currentTimeMillis();
62
63     final ProducerRecord<String, String> record = new ProducerRecord<>(
64         topic,  // Topic
65         key,    // Key
66         value   // Value
67     );
68
69     producer.send(record, (metadata, e) ->
70     {
71       long now = System.currentTimeMillis();
72       if (e == null)
73       {
74         // HANDLE SUCCESS
75         produced++;
76         log.debug(
77             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
78             id,
79             record.key(),
80             record.value(),
81             metadata.partition(),
82             metadata.offset(),
83             metadata.timestamp(),
84             now - time
85         );
86       }
87       else
88       {
89         // HANDLE ERROR
90         log.error(
91             "{} - ERROR key={} timestamp={} latency={}ms: {}",
92             id,
93             record.key(),
94             metadata == null ? -1 : metadata.timestamp(),
95             now - time,
96             e.toString()
97         );
98       }
99     });
100
101     long now = System.currentTimeMillis();
102     log.trace(
103         "{} - Queued #{} key={} latency={}ms",
104         id,
105         value,
106         record.key(),
107         now - time
108     );
109   }
110
111
112   public static void main(String[] args) throws Exception
113   {
114     String broker = ":9092";
115     String topic = "test";
116     String clientId = "DEV";
117
118     switch (args.length)
119     {
120       case 3:
121         clientId = args[2];
122       case 2:
123         topic = args[1];
124       case 1:
125         broker = args[0];
126     }
127
128     SimpleProducer instance = new SimpleProducer(broker, topic, clientId);
129
130     Runtime.getRuntime().addShutdownHook(new Thread(() ->
131     {
132       instance.running = false;
133       while (!instance.done)
134       {
135         log.info("Waiting for main-thread...");
136         try
137         {
138           Thread.sleep(1000);
139         }
140         catch (InterruptedException e) {}
141       }
142       log.info("Shutdown completed.");
143     }));
144
145     log.info(
146         "Running SimpleProducer: broker={}, topic={}, client-id={}",
147         broker,
148         topic,
149         clientId);
150     instance.run();
151   }
152 }