Endless Producer: a simple producer, implemented as Spring-Boot-App
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessProducer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.KafkaProducer;
5 import org.apache.kafka.clients.producer.ProducerRecord;
6 import org.apache.kafka.common.serialization.StringSerializer;
7
8 import javax.annotation.PreDestroy;
9 import java.util.Properties;
10 import java.util.concurrent.ExecutionException;
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.Future;
13
14
15 @Slf4j
16 public class EndlessProducer implements Runnable
17 {
18   private final ExecutorService executor;
19   private final String id;
20   private final String topic;
21   private final String acks;
22   private final int throttleMs;
23   private final KafkaProducer<String, String> producer;
24
25   private boolean running = false;
26   private long i = 0;
27   private long produced = 0;
28   private Future<?> future = null;
29
30   public EndlessProducer(
31       ExecutorService executor,
32       String bootstrapServer,
33       String clientId,
34       String topic,
35       String acks,
36       int throttleMs)
37   {
38     this.executor = executor;
39     this.id = clientId;
40     this.topic = topic;
41     this.acks = acks;
42     this.throttleMs = throttleMs;
43
44     Properties props = new Properties();
45     props.put("bootstrap.servers", bootstrapServer);
46     props.put("client.id", clientId);
47     props.put("acks", acks);
48     props.put("key.serializer", StringSerializer.class.getName());
49     props.put("value.serializer", StringSerializer.class.getName());
50
51     this.producer = new KafkaProducer<>(props);
52   }
53
54   @Override
55   public void run()
56   {
57     try
58     {
59       for (; running; i++)
60       {
61         final long time = System.currentTimeMillis();
62
63         final ProducerRecord<String, String> record = new ProducerRecord<>(
64             topic,                 // Topic
65             Long.toString(i % 10), // Key
66             Long.toString(i)       // Value
67         );
68
69         producer.send(record, (metadata, e) ->
70         {
71           long now = System.currentTimeMillis();
72           if (e == null)
73           {
74             // HANDLE SUCCESS
75             produced++;
76             log.debug(
77                 "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
78                 id,
79                 record.key(),
80                 record.value(),
81                 metadata.partition(),
82                 metadata.offset(),
83                 metadata.timestamp(),
84                 now - time
85             );
86           }
87           else
88           {
89             // HANDLE ERROR
90             log.error(
91                 "{} - ERROR key={} timestamp={} latency={}ms: {}",
92                 id,
93                 record.key(),
94                 metadata == null ? -1 : metadata.timestamp(),
95                 now - time,
96                 e.toString()
97             );
98           }
99         });
100
101         long now = System.currentTimeMillis();
102         log.trace(
103             "{} - Queued #{} key={} latency={}ms",
104             id,
105             i,
106             record.key(),
107             now - time
108         );
109
110         if (throttleMs > 0)
111         {
112           try
113           {
114             Thread.sleep(throttleMs);
115           }
116           catch (InterruptedException e)
117           {
118             log.warn("{} - Interrupted while throttling!", e);
119           }
120         }
121       }
122
123       log.info("{} - Done", id);
124     }
125     catch (Exception e)
126     {
127
128     }
129   }
130
131   public synchronized void start()
132   {
133     if (running)
134       throw new IllegalStateException("Producer instance " + id + " is already running!");
135
136     log.info("{} - Starting - produced {} messages before", id, produced);
137     running = true;
138     future = executor.submit(this);
139   }
140
141   public synchronized void stop() throws ExecutionException, InterruptedException
142   {
143     if (!running)
144       throw new IllegalStateException("Producer instance " + id + " is not running!");
145
146     log.info("{} - Stopping...", id);
147     running = false;
148     future.get();
149     log.info("{} - Stopped - produced {} messages so far", id, produced);
150   }
151
152   @PreDestroy
153   public void destroy() throws ExecutionException, InterruptedException
154   {
155     log.info("{} - Destroy!", id);
156     try
157     {
158       stop();
159     }
160     catch (IllegalStateException e)
161     {
162       log.info("{} - Was already stopped", id);
163     }
164     finally
165     {
166       log.info("{} - Closing the KafkaProducer", id);
167       producer.close();
168       log.info("{}: Produced {} messages in total, exiting!", id, produced);
169     }
170   }
171 }