1 package de.juplo.kafka;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.KafkaProducer;
5 import org.apache.kafka.clients.producer.ProducerRecord;
6 import org.apache.kafka.common.serialization.StringSerializer;
8 import javax.annotation.PreDestroy;
9 import java.util.Properties;
10 import java.util.concurrent.ExecutionException;
11 import java.util.concurrent.ExecutorService;
15 public class EndlessProducer implements Runnable
17 private final ExecutorService executor;
18 private final String id;
19 private final String topic;
20 private final int commitIntervalMs;
21 private final int throttleMs;
22 private final KafkaProducer<String, String> producer;
24 private boolean running = false;
26 private long produced = 0;
27 private long lastCommit;
29 public EndlessProducer(
30 ExecutorService executor,
31 String bootstrapServer,
38 this.executor = executor;
41 this.commitIntervalMs = commitIntervalMs;
42 this.throttleMs = throttleMs;
44 Properties props = new Properties();
45 props.put("bootstrap.servers", bootstrapServer);
46 props.put("transactional.id", clientId);
47 props.put("client.id", clientId);
48 props.put("acks", acks);
49 props.put("key.serializer", StringSerializer.class.getName());
50 props.put("value.serializer", StringSerializer.class.getName());
52 this.producer = new KafkaProducer<>(props);
60 producer.initTransactions();
62 lastCommit = System.currentTimeMillis();
63 log.info("{} - Beginning transaction", id);
64 producer.beginTransaction();
68 send(Long.toString(i%10), Long.toString(i));
74 Thread.sleep(throttleMs);
76 catch (InterruptedException e)
78 log.warn("{} - Interrupted while throttling!", e);
82 long now = System.currentTimeMillis();
83 if (now - lastCommit >= commitIntervalMs)
85 log.info("{} - Commiting transaction", id);
86 producer.commitTransaction();
88 log.info("{} - Beginning new transaction", id);
89 producer.beginTransaction();
93 log.info("{} - Commiting transaction", id);
94 producer.commitTransaction();
95 log.info("{} - Done", id);
99 log.error("{} - Unexpected Exception:", id, e);
100 log.info("{} - Aborting transaction", id);
101 producer.abortTransaction();
108 log.info("{} - Stopped - produced {} messages so far", id, produced);
113 void send(String key, String value)
115 final long time = System.currentTimeMillis();
117 final ProducerRecord<String, String> record = new ProducerRecord<>(
123 producer.send(record, (metadata, e) ->
125 long now = System.currentTimeMillis();
131 "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
135 metadata.partition(),
137 metadata.timestamp(),
145 "{} - ERROR key={} timestamp={} latency={}ms: {}",
148 metadata == null ? -1 : metadata.timestamp(),
155 long now = System.currentTimeMillis();
157 "{} - Queued #{} key={} latency={}ms",
165 public synchronized void start()
168 throw new IllegalStateException("Producer instance " + id + " is already running!");
170 log.info("{} - Starting - produced {} messages before", id, produced);
172 executor.submit(this);
175 public synchronized void stop() throws ExecutionException, InterruptedException
178 throw new IllegalStateException("Producer instance " + id + " is not running!");
180 log.info("{} - Stopping...", id);
185 public void destroy() throws ExecutionException, InterruptedException
187 log.info("{} - Destroy!", id);
192 catch (IllegalStateException e)
194 log.info("{} - Was already stopped", id);
198 log.info("{} - Closing the KafkaProducer", id);
200 log.info("{}: Produced {} messages in total, exiting!", id, produced);