1 package de.juplo.kafka;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.ProducerRecord;
5 import org.apache.kafka.clients.producer.RecordMetadata;
6 import org.springframework.kafka.core.KafkaTemplate;
7 import org.springframework.kafka.support.SendResult;
8 import org.springframework.util.concurrent.ListenableFuture;
10 import javax.annotation.PreDestroy;
11 import java.util.concurrent.ExecutionException;
12 import java.util.concurrent.ExecutorService;
16 public class EndlessProducer implements Runnable
18 private final ExecutorService executor;
19 private final String id;
20 private final int throttleMs;
21 private final KafkaTemplate<String, String> kafkaTemplate;
23 private boolean running = false;
25 private long produced = 0;
27 public EndlessProducer(
28 ExecutorService executor,
31 KafkaTemplate<String, String> kafkaTemplate)
33 this.executor = executor;
35 this.throttleMs = throttleMs;
36 this.kafkaTemplate = kafkaTemplate;
46 send(Long.toString(i%10), Long.toString(i));
52 Thread.sleep(throttleMs);
54 catch (InterruptedException e)
56 log.warn("{} - Interrupted while throttling!", e);
61 log.info("{} - Done", id);
65 log.error("{} - Unexpected Exception:", id, e);
72 log.info("{} - Stopped - produced {} messages so far", id, produced);
77 void send(String key, String value)
79 final long time = System.currentTimeMillis();
81 ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.sendDefault(key, value);
82 listenableFuture.addCallback(
85 long now = System.currentTimeMillis();
86 RecordMetadata metadata = result.getRecordMetadata();
87 ProducerRecord<String, String> record = result.getProducerRecord();
92 "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
104 long now = System.currentTimeMillis();
108 "{} - ERROR key={} latency={}ms: {}",
116 long now = System.currentTimeMillis();
118 "{} - Queued #{} key={} latency={}ms",
126 public synchronized void start()
129 throw new IllegalStateException("Producer instance " + id + " is already running!");
131 log.info("{} - Starting - produced {} messages before", id, produced);
133 executor.submit(this);
136 public synchronized void stop() throws ExecutionException, InterruptedException
139 throw new IllegalStateException("Producer instance " + id + " is not running!");
141 log.info("{} - Stopping...", id);
146 public void destroy() throws ExecutionException, InterruptedException
148 log.info("{} - Destroy!", id);
153 catch (IllegalStateException e)
155 log.info("{} - Was already stopped", id);
159 log.info("{}: Produced {} messages in total, exiting!", id, produced);