1 package de.juplo.kafka;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.Producer;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerRecord;
7 import org.apache.kafka.common.serialization.StringSerializer;
9 import java.util.Properties;
13 public class SimpleProducer
15 private final String id;
16 private final String topic;
17 private final Producer<String, String> producer;
19 private long produced = 0;
21 public SimpleProducer(String broker, String topic, String clientId)
23 Properties props = new Properties();
24 props.put("bootstrap.servers", broker);
25 props.put("client.id", clientId); // Nur zur Wiedererkennung
26 props.put("key.serializer", StringSerializer.class.getName());
27 props.put("value.serializer", StringSerializer.class.getName());
31 producer = new KafkaProducer<>(props);
38 for (int i = 0; i < 100000 ; i++)
40 send(Long.toString(i%10), Long.toString(i));
45 log.info("{}: Produced {} messages in total, exiting!", id, produced);
49 void send(String key, String value)
51 final long time = System.currentTimeMillis();
53 final ProducerRecord<String, String> record = new ProducerRecord<>(
59 producer.send(record, (metadata, e) ->
61 long now = System.currentTimeMillis();
67 "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
81 "{} - ERROR key={} timestamp={} latency={}ms: {}",
84 metadata == null ? -1 : metadata.timestamp(),
91 long now = System.currentTimeMillis();
93 "{} - Queued #{} key={} latency={}ms",
102 public static void main(String[] args) throws Exception
104 String broker = ":9092";
105 String topic = "test";
106 String clientId = "DEV";
118 SimpleProducer instance = new SimpleProducer(broker, topic, clientId);
121 "Running SimpleProducer: broker={}, topic={}, client-id={}",