1 package de.juplo.kafka;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.Producer;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerRecord;
7 import org.apache.kafka.common.serialization.StringSerializer;
9 import java.util.Properties;
10 import java.util.concurrent.ExecutorService;
11 import java.util.concurrent.Future;
15 public class SimpleProducer
17 private final String id;
18 private final String topic;
19 private final Producer<String, String> producer;
21 private long produced = 0;
23 public SimpleProducer(String clientId, String topic)
26 Properties props = new Properties();
27 props.put("bootstrap.servers", "localhost:9092");
28 props.put("key.serializer", StringSerializer.class.getName());
29 props.put("value.serializer", StringSerializer.class.getName());
31 Producer<String, String> producer = new KafkaProducer<>(props);
36 this.producer = producer;
47 send(Long.toString(i%10), Long.toString(i));
50 log.info("{} - Done", id);
54 log.info("{}: Closing the KafkaProducer", id);
56 log.info("{}: Produced {} messages in total, exiting!", id, produced);
60 void send(String key, String value)
62 final long time = System.currentTimeMillis();
64 final ProducerRecord<String, String> record = new ProducerRecord<>(
70 producer.send(record, (metadata, e) ->
72 long now = System.currentTimeMillis();
78 "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
92 "{} - ERROR key={} timestamp={} latency={}ms: {}",
95 metadata == null ? -1 : metadata.timestamp(),
102 long now = System.currentTimeMillis();
104 "{} - Queued #{} key={} latency={}ms",
113 public static void main(String[] args) throws Exception
115 SimpleProducer producer = new SimpleProducer("P", "test");