EndlessProducer arbeitet jetzt transaktional
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessProducer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.KafkaProducer;
5 import org.apache.kafka.clients.producer.ProducerRecord;
6 import org.apache.kafka.common.serialization.StringSerializer;
7
8 import javax.annotation.PreDestroy;
9 import java.util.Properties;
10 import java.util.concurrent.ExecutionException;
11 import java.util.concurrent.ExecutorService;
12
13
14 @Slf4j
15 public class EndlessProducer implements Runnable
16 {
17   private final ExecutorService executor;
18   private final String id;
19   private final String topic;
20   private final int commitIntervalMs;
21   private final int throttleMs;
22   private final KafkaProducer<String, String> producer;
23
24   private boolean running = false;
25   private long i = 0;
26   private long produced = 0;
27   private long lastCommit;
28
29   public EndlessProducer(
30       ExecutorService executor,
31       String bootstrapServer,
32       String clientId,
33       String topic,
34       String acks,
35       int commitIntervalMs,
36       int throttleMs)
37   {
38     this.executor = executor;
39     this.id = clientId;
40     this.topic = topic;
41     this.commitIntervalMs = commitIntervalMs;
42     this.throttleMs = throttleMs;
43
44     Properties props = new Properties();
45     props.put("bootstrap.servers", bootstrapServer);
46     props.put("transactional.id", clientId);
47     props.put("client.id", clientId);
48     props.put("acks", acks);
49     props.put("key.serializer", StringSerializer.class.getName());
50     props.put("value.serializer", StringSerializer.class.getName());
51
52     this.producer = new KafkaProducer<>(props);
53   }
54
55   @Override
56   public void run()
57   {
58     try
59     {
60       producer.initTransactions();
61
62       lastCommit = System.currentTimeMillis();
63       log.info("{} - Beginning transaction", id);
64       producer.beginTransaction();
65
66       for (; running; i++)
67       {
68         send(Long.toString(i%10), Long.toString(i));
69
70         if (throttleMs > 0)
71         {
72           try
73           {
74             Thread.sleep(throttleMs);
75           }
76           catch (InterruptedException e)
77           {
78             log.warn("{} - Interrupted while throttling!", e);
79           }
80         }
81
82         long now = System.currentTimeMillis();
83         if (now - lastCommit >= commitIntervalMs)
84         {
85           log.info("{} - Commiting transaction", id);
86           producer.commitTransaction();
87           lastCommit = now;
88           log.info("{} - Beginning new transaction", id);
89           producer.beginTransaction();
90         }
91       }
92
93       log.info("{} - Commiting transaction", id);
94       producer.commitTransaction();
95       log.info("{} - Done", id);
96     }
97     catch (Exception e)
98     {
99       log.error("{} - Unexpected Exception:", id, e);
100       log.info("{} - Aborting transaction", id);
101       producer.abortTransaction();
102     }
103     finally
104     {
105       synchronized (this)
106       {
107         running = false;
108         log.info("{} - Stopped - produced {} messages so far", id, produced);
109       }
110     }
111   }
112
113   void send(String key, String value)
114   {
115     final long time = System.currentTimeMillis();
116
117     final ProducerRecord<String, String> record = new ProducerRecord<>(
118         topic,  // Topic
119         key,    // Key
120         value   // Value
121     );
122
123     producer.send(record, (metadata, e) ->
124     {
125       long now = System.currentTimeMillis();
126       if (e == null)
127       {
128         // HANDLE SUCCESS
129         produced++;
130         log.debug(
131             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
132             id,
133             record.key(),
134             record.value(),
135             metadata.partition(),
136             metadata.offset(),
137             metadata.timestamp(),
138             now - time
139         );
140       }
141       else
142       {
143         // HANDLE ERROR
144         log.error(
145             "{} - ERROR key={} timestamp={} latency={}ms: {}",
146             id,
147             record.key(),
148             metadata == null ? -1 : metadata.timestamp(),
149             now - time,
150             e.toString()
151         );
152       }
153     });
154
155     long now = System.currentTimeMillis();
156     log.trace(
157         "{} - Queued #{} key={} latency={}ms",
158         id,
159         value,
160         record.key(),
161         now - time
162     );
163   }
164
165   public synchronized void start()
166   {
167     if (running)
168       throw new IllegalStateException("Producer instance " + id + " is already running!");
169
170     log.info("{} - Starting - produced {} messages before", id, produced);
171     running = true;
172     executor.submit(this);
173   }
174
175   public synchronized void stop() throws ExecutionException, InterruptedException
176   {
177     if (!running)
178       throw new IllegalStateException("Producer instance " + id + " is not running!");
179
180     log.info("{} - Stopping...", id);
181     running = false;
182   }
183
184   @PreDestroy
185   public void destroy() throws ExecutionException, InterruptedException
186   {
187     log.info("{} - Destroy!", id);
188     try
189     {
190       stop();
191     }
192     catch (IllegalStateException e)
193     {
194       log.info("{} - Was already stopped", id);
195     }
196     finally
197     {
198       log.info("{} - Closing the KafkaProducer", id);
199       producer.close();
200       log.info("{}: Produced {} messages in total, exiting!", id, produced);
201     }
202   }
203 }