7a5b324e144fddbaef84c036640e62448c6f7a4f
[demos/kafka/training] / src / main / java / de / juplo / kafka / RestProducer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.KafkaProducer;
5 import org.apache.kafka.clients.producer.ProducerRecord;
6 import org.apache.kafka.common.serialization.StringSerializer;
7
8 import javax.annotation.PreDestroy;
9 import java.util.Properties;
10 import java.util.concurrent.ExecutionException;
11 import java.util.concurrent.ExecutorService;
12
13
14 @Slf4j
15 public class EndlessProducer implements Runnable
16 {
17   private final ExecutorService executor;
18   private final String id;
19   private final String topic;
20   private final int throttleMs;
21   private final KafkaProducer<String, String> producer;
22
23   private boolean running = false;
24   private long i = 0;
25   private long produced = 0;
26
27   public EndlessProducer(
28       ExecutorService executor,
29       String bootstrapServer,
30       String clientId,
31       String topic,
32       String acks,
33       int throttleMs)
34   {
35     this.executor = executor;
36     this.id = clientId;
37     this.topic = topic;
38     this.throttleMs = throttleMs;
39
40     Properties props = new Properties();
41     props.put("bootstrap.servers", bootstrapServer);
42     props.put("client.id", clientId);
43     props.put("acks", acks);
44     props.put("key.serializer", StringSerializer.class.getName());
45     props.put("value.serializer", StringSerializer.class.getName());
46
47     this.producer = new KafkaProducer<>(props);
48   }
49
50   @Override
51   public void run()
52   {
53     try
54     {
55       for (; running; i++)
56       {
57         send(Long.toString(i%10), Long.toString(i));
58
59         if (throttleMs > 0)
60         {
61           try
62           {
63             Thread.sleep(throttleMs);
64           }
65           catch (InterruptedException e)
66           {
67             log.warn("{} - Interrupted while throttling!", e);
68           }
69         }
70       }
71
72       log.info("{} - Done", id);
73     }
74     catch (Exception e)
75     {
76       log.error("{} - Unexpected Exception:", id, e);
77     }
78     finally
79     {
80       synchronized (this)
81       {
82         running = false;
83         log.info("{} - Stopped - produced {} messages so far", id, produced);
84       }
85     }
86   }
87
88   void send(String key, String value)
89   {
90     final long time = System.currentTimeMillis();
91
92     final ProducerRecord<String, String> record = new ProducerRecord<>(
93         topic,  // Topic
94         key,    // Key
95         value   // Value
96     );
97
98     producer.send(record, (metadata, e) ->
99     {
100       long now = System.currentTimeMillis();
101       if (e == null)
102       {
103         // HANDLE SUCCESS
104         produced++;
105         log.debug(
106             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
107             id,
108             record.key(),
109             record.value(),
110             metadata.partition(),
111             metadata.offset(),
112             metadata.timestamp(),
113             now - time
114         );
115       }
116       else
117       {
118         // HANDLE ERROR
119         log.error(
120             "{} - ERROR key={} timestamp={} latency={}ms: {}",
121             id,
122             record.key(),
123             metadata == null ? -1 : metadata.timestamp(),
124             now - time,
125             e.toString()
126         );
127       }
128     });
129
130     long now = System.currentTimeMillis();
131     log.trace(
132         "{} - Queued #{} key={} latency={}ms",
133         id,
134         value,
135         record.key(),
136         now - time
137     );
138   }
139
140   public synchronized void start()
141   {
142     if (running)
143       throw new IllegalStateException("Producer instance " + id + " is already running!");
144
145     log.info("{} - Starting - produced {} messages before", id, produced);
146     running = true;
147     executor.submit(this);
148   }
149
150   public synchronized void stop() throws ExecutionException, InterruptedException
151   {
152     if (!running)
153       throw new IllegalStateException("Producer instance " + id + " is not running!");
154
155     log.info("{} - Stopping...", id);
156     running = false;
157   }
158
159   @PreDestroy
160   public void destroy() throws ExecutionException, InterruptedException
161   {
162     log.info("{} - Destroy!", id);
163     try
164     {
165       stop();
166     }
167     catch (IllegalStateException e)
168     {
169       log.info("{} - Was already stopped", id);
170     }
171     finally
172     {
173       log.info("{} - Closing the KafkaProducer", id);
174       producer.close();
175       log.info("{}: Produced {} messages in total, exiting!", id, produced);
176     }
177   }
178 }