Obsoletes Klassen-Attribut entfernt
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessProducer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.KafkaProducer;
5 import org.apache.kafka.clients.producer.ProducerRecord;
6 import org.apache.kafka.common.serialization.StringSerializer;
7
8 import javax.annotation.PreDestroy;
9 import java.util.Properties;
10 import java.util.concurrent.ExecutionException;
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.Future;
13
14
15 @Slf4j
16 public class EndlessProducer implements Runnable
17 {
18   private final ExecutorService executor;
19   private final String id;
20   private final String topic;
21   private final int throttleMs;
22   private final KafkaProducer<String, String> producer;
23
24   private boolean running = false;
25   private long i = 0;
26   private long produced = 0;
27   private Future<?> future = null;
28
29   public EndlessProducer(
30       ExecutorService executor,
31       String bootstrapServer,
32       String clientId,
33       String topic,
34       String acks,
35       int throttleMs)
36   {
37     this.executor = executor;
38     this.id = clientId;
39     this.topic = topic;
40     this.throttleMs = throttleMs;
41
42     Properties props = new Properties();
43     props.put("bootstrap.servers", bootstrapServer);
44     props.put("client.id", clientId);
45     props.put("acks", acks);
46     props.put("key.serializer", StringSerializer.class.getName());
47     props.put("value.serializer", StringSerializer.class.getName());
48
49     this.producer = new KafkaProducer<>(props);
50   }
51
52   @Override
53   public void run()
54   {
55     try
56     {
57       for (; running; i++)
58       {
59         final long time = System.currentTimeMillis();
60
61         final ProducerRecord<String, String> record = new ProducerRecord<>(
62             topic,                 // Topic
63             Long.toString(i % 10), // Key
64             Long.toString(i)       // Value
65         );
66
67         producer.send(record, (metadata, e) ->
68         {
69           long now = System.currentTimeMillis();
70           if (e == null)
71           {
72             // HANDLE SUCCESS
73             produced++;
74             log.debug(
75                 "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
76                 id,
77                 record.key(),
78                 record.value(),
79                 metadata.partition(),
80                 metadata.offset(),
81                 metadata.timestamp(),
82                 now - time
83             );
84           }
85           else
86           {
87             // HANDLE ERROR
88             log.error(
89                 "{} - ERROR key={} timestamp={} latency={}ms: {}",
90                 id,
91                 record.key(),
92                 metadata == null ? -1 : metadata.timestamp(),
93                 now - time,
94                 e.toString()
95             );
96           }
97         });
98
99         long now = System.currentTimeMillis();
100         log.trace(
101             "{} - Queued #{} key={} latency={}ms",
102             id,
103             i,
104             record.key(),
105             now - time
106         );
107
108         if (throttleMs > 0)
109         {
110           try
111           {
112             Thread.sleep(throttleMs);
113           }
114           catch (InterruptedException e)
115           {
116             log.warn("{} - Interrupted while throttling!", e);
117           }
118         }
119       }
120
121       log.info("{} - Done", id);
122     }
123     catch (Exception e)
124     {
125
126     }
127   }
128
129   public synchronized void start()
130   {
131     if (running)
132       throw new IllegalStateException("Producer instance " + id + " is already running!");
133
134     log.info("{} - Starting - produced {} messages before", id, produced);
135     running = true;
136     future = executor.submit(this);
137   }
138
139   public synchronized void stop() throws ExecutionException, InterruptedException
140   {
141     if (!running)
142       throw new IllegalStateException("Producer instance " + id + " is not running!");
143
144     log.info("{} - Stopping...", id);
145     running = false;
146     future.get();
147     log.info("{} - Stopped - produced {} messages so far", id, produced);
148   }
149
150   @PreDestroy
151   public void destroy() throws ExecutionException, InterruptedException
152   {
153     log.info("{} - Destroy!", id);
154     try
155     {
156       stop();
157     }
158     catch (IllegalStateException e)
159     {
160       log.info("{} - Was already stopped", id);
161     }
162     finally
163     {
164       log.info("{} - Closing the KafkaProducer", id);
165       producer.close();
166       log.info("{}: Produced {} messages in total, exiting!", id, produced);
167     }
168   }
169 }