fcf42de3eebc8bf22c3dd1f3eb9cc43ecb30a32f
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessProducer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.KafkaProducer;
5 import org.apache.kafka.clients.producer.ProducerRecord;
6 import org.apache.kafka.common.serialization.StringSerializer;
7
8 import javax.annotation.PreDestroy;
9 import java.util.Properties;
10 import java.util.concurrent.ExecutionException;
11 import java.util.concurrent.ExecutorService;
12
13
14 @Slf4j
15 public class EndlessProducer implements Runnable
16 {
17   private final ExecutorService executor;
18   private final String id;
19   private final String topic;
20   private final int throttleMs;
21   private final KafkaProducer<String, String> producer;
22
23   private boolean running = false;
24   private long i = 0;
25   private long produced = 0;
26
27   public EndlessProducer(
28       ExecutorService executor,
29       String bootstrapServer,
30       String clientId,
31       String topic,
32       String acks,
33       int throttleMs)
34   {
35     this.executor = executor;
36     this.id = clientId;
37     this.topic = topic;
38     this.throttleMs = throttleMs;
39
40     Properties props = new Properties();
41     props.put("bootstrap.servers", bootstrapServer);
42     props.put("client.id", clientId);
43     props.put("acks", acks);
44     props.put("metadata.max.age.ms", "1000");
45     props.put("partitioner.class", "org.apache.kafka.clients.producer.RoundRobinPartitioner");
46     props.put("key.serializer", StringSerializer.class.getName());
47     props.put("value.serializer", StringSerializer.class.getName());
48
49     this.producer = new KafkaProducer<>(props);
50   }
51
52   @Override
53   public void run()
54   {
55     try
56     {
57       for (; running; i++)
58       {
59         send(Long.toString(i%10), Long.toString(i));
60
61         if (throttleMs > 0)
62         {
63           try
64           {
65             Thread.sleep(throttleMs);
66           }
67           catch (InterruptedException e)
68           {
69             log.warn("{} - Interrupted while throttling!", e);
70           }
71         }
72       }
73
74       log.info("{} - Done", id);
75     }
76     catch (Exception e)
77     {
78       log.error("{} - Unexpected Exception:", id, e);
79     }
80     finally
81     {
82       synchronized (this)
83       {
84         running = false;
85         log.info("{} - Stopped - produced {} messages so far", id, produced);
86       }
87     }
88   }
89
90   void send(String key, String value)
91   {
92     final long time = System.currentTimeMillis();
93
94     final ProducerRecord<String, String> record = new ProducerRecord<>(
95         topic,  // Topic
96         key,    // Key
97         value   // Value
98     );
99
100     producer.send(record, (metadata, e) ->
101     {
102       long now = System.currentTimeMillis();
103       if (e == null)
104       {
105         // HANDLE SUCCESS
106         produced++;
107         log.debug(
108             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
109             id,
110             record.key(),
111             record.value(),
112             metadata.partition(),
113             metadata.offset(),
114             metadata.timestamp(),
115             now - time
116         );
117       }
118       else
119       {
120         // HANDLE ERROR
121         log.error(
122             "{} - ERROR key={} timestamp={} latency={}ms: {}",
123             id,
124             record.key(),
125             metadata == null ? -1 : metadata.timestamp(),
126             now - time,
127             e.toString()
128         );
129       }
130     });
131
132     long now = System.currentTimeMillis();
133     log.trace(
134         "{} - Queued #{} key={} latency={}ms",
135         id,
136         value,
137         record.key(),
138         now - time
139     );
140   }
141
142   public synchronized void start()
143   {
144     if (running)
145       throw new IllegalStateException("Producer instance " + id + " is already running!");
146
147     log.info("{} - Starting - produced {} messages before", id, produced);
148     running = true;
149     executor.submit(this);
150   }
151
152   public synchronized void stop() throws ExecutionException, InterruptedException
153   {
154     if (!running)
155       throw new IllegalStateException("Producer instance " + id + " is not running!");
156
157     log.info("{} - Stopping...", id);
158     running = false;
159   }
160
161   @PreDestroy
162   public void destroy() throws ExecutionException, InterruptedException
163   {
164     log.info("{} - Destroy!", id);
165     try
166     {
167       stop();
168     }
169     catch (IllegalStateException e)
170     {
171       log.info("{} - Was already stopped", id);
172     }
173     finally
174     {
175       log.info("{} - Closing the KafkaProducer", id);
176       producer.close();
177       log.info("{}: Produced {} messages in total, exiting!", id, produced);
178     }
179   }
180 }