1 package de.juplo.kafka;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.Producer;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerRecord;
7 import org.apache.kafka.common.serialization.StringSerializer;
9 import java.util.Properties;
10 import java.util.concurrent.CompletableFuture;
14 public class SimpleProducer
16 private final String id;
17 private final String topic;
18 private final Producer<String, String> producer;
20 private long produced = 0;
21 private volatile boolean running = true;
22 private volatile boolean done = false;
24 public SimpleProducer(String broker, String topic, String clientId)
26 this(broker, topic, clientId, null);
29 public SimpleProducer(String broker, String topic, String clientId, String transactionalId)
31 Properties props = new Properties();
32 props.put("bootstrap.servers", broker);
33 props.put("client.id", clientId); // Nur zur Wiedererkennung
34 if (transactionalId != null)
35 props.put("transactional.id", transactionalId); // Aktiviert außerdem enable.idempotence=true
36 props.put("key.serializer", StringSerializer.class.getName());
37 props.put("value.serializer", StringSerializer.class.getName());
41 producer = new KafkaProducer<>(props);
42 producer.initTransactions();
53 send(Long.toString(i%10), Long.toString(i));
57 catch (InterruptedException e) {}
60 log.info("{}: Closing the KafkaProducer", id);
62 log.info("{}: Produced {} messages in total, exiting!", id, produced);
69 producer.beginTransaction();
74 producer.commitTransaction();
77 CompletableFuture<Long> send(String key, String value)
79 final CompletableFuture<Long> result = new CompletableFuture<>();
80 final long time = System.currentTimeMillis();
82 final ProducerRecord<String, String> record = new ProducerRecord<>(
88 producer.send(record, (metadata, e) ->
90 long now = System.currentTimeMillis();
96 "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
100 metadata.partition(),
102 metadata.timestamp(),
105 result.complete(metadata.offset());
111 "{} - ERROR key={} timestamp={} latency={}ms: {}",
114 metadata == null ? -1 : metadata.timestamp(),
118 result.completeExceptionally(e);
122 long now = System.currentTimeMillis();
124 "{} - Queued #{} key={} latency={}ms",
135 public static void main(String[] args) throws Exception
137 String broker = ":9092";
138 String topic = "test";
139 String clientId = "DEV";
151 SimpleProducer instance = new SimpleProducer(broker, topic, clientId);
153 Runtime.getRuntime().addShutdownHook(new Thread(() ->
155 instance.running = false;
156 while (!instance.done)
158 log.info("Waiting for main-thread...");
163 catch (InterruptedException e) {}
165 log.info("Shutdown completed.");
169 "Running SimpleProducer: broker={}, topic={}, client-id={}",