1 package de.juplo.kafka;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.KafkaProducer;
5 import org.apache.kafka.clients.producer.ProducerRecord;
6 import org.apache.kafka.common.serialization.LongSerializer;
7 import org.apache.kafka.common.serialization.StringSerializer;
9 import javax.annotation.PreDestroy;
10 import java.util.Properties;
11 import java.util.concurrent.ExecutionException;
12 import java.util.concurrent.ExecutorService;
16 public class EndlessProducer implements Runnable
18 private final ExecutorService executor;
19 private final String id;
20 private final String topic;
21 private final int throttleMs;
22 private final KafkaProducer<String, Long> producer;
24 private boolean running = false;
26 private long produced = 0;
28 public EndlessProducer(
29 ExecutorService executor,
30 String bootstrapServer,
36 this.executor = executor;
39 this.throttleMs = throttleMs;
41 Properties props = new Properties();
42 props.put("bootstrap.servers", bootstrapServer);
43 props.put("client.id", clientId);
44 props.put("acks", acks);
45 props.put("key.serializer", StringSerializer.class.getName());
46 props.put("value.serializer", LongSerializer.class.getName());
48 this.producer = new KafkaProducer<>(props);
58 send(Long.toString(i%10), i);
64 Thread.sleep(throttleMs);
66 catch (InterruptedException e)
68 log.warn("{} - Interrupted while throttling!", e);
73 log.info("{} - Done", id);
77 log.error("{} - Unexpected Exception:", id, e);
84 log.info("{} - Stopped - produced {} messages so far", id, produced);
89 void send(String key, Long value)
91 final long time = System.currentTimeMillis();
93 final ProducerRecord<String, Long> record = new ProducerRecord<>(
99 producer.send(record, (metadata, e) ->
101 long now = System.currentTimeMillis();
107 "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
111 metadata.partition(),
113 metadata.timestamp(),
121 "{} - ERROR key={} timestamp={} latency={}ms: {}",
124 metadata == null ? -1 : metadata.timestamp(),
131 long now = System.currentTimeMillis();
133 "{} - Queued #{} key={} latency={}ms",
141 public synchronized void start()
144 throw new IllegalStateException("Producer instance " + id + " is already running!");
146 log.info("{} - Starting - produced {} messages before", id, produced);
148 executor.submit(this);
151 public synchronized void stop() throws ExecutionException, InterruptedException
154 throw new IllegalStateException("Producer instance " + id + " is not running!");
156 log.info("{} - Stopping...", id);
161 public void destroy() throws ExecutionException, InterruptedException
163 log.info("{} - Destroy!", id);
168 catch (IllegalStateException e)
170 log.info("{} - Was already stopped", id);
174 log.info("{} - Closing the KafkaProducer", id);
176 log.info("{}: Produced {} messages in total, exiting!", id, produced);