1 package de.juplo.kafka;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.KafkaProducer;
5 import org.apache.kafka.clients.producer.ProducerRecord;
6 import org.apache.kafka.common.serialization.StringSerializer;
8 import java.util.Properties;
12 public class SimpleProducer
14 public static void main(String[] args) throws Exception
17 Properties props = new Properties();
18 props.put("bootstrap.servers", "localhost:9092");
19 props.put("key.serializer", StringSerializer.class.getName());
20 props.put("value.serializer", StringSerializer.class.getName());
22 KafkaProducer<String, String> producer = new KafkaProducer<>(props);
32 final long time = System.currentTimeMillis();
34 final ProducerRecord<String, String> record = new ProducerRecord<>(
36 Long.toString(i%10), // Key
37 Long.toString(i) // Value
40 producer.send(record, (metadata, e) ->
42 long now = System.currentTimeMillis();
47 "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
61 "{} - ERROR key={} timestamp={} latency={}ms: {}",
64 metadata == null ? -1 : metadata.timestamp(),
71 long now = System.currentTimeMillis();
73 "{} - Queued #{} key={} latency={}ms",
83 log.info("{}: Closing the KafkaProducer", id);
85 log.info("{}: Exiting!", id);