1 package de.juplo.kafka;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.KafkaProducer;
5 import org.apache.kafka.clients.producer.ProducerRecord;
6 import org.apache.kafka.common.serialization.StringSerializer;
8 import java.util.Properties;
12 public class SimpleProducer
14 private final String id;
15 private final String topic;
16 private final KafkaProducer<String, String> producer;
18 private long produced = 0;
19 private volatile boolean running = true;
20 private volatile boolean done = false;
22 public SimpleProducer(String broker, String topic, String clientId)
24 Properties props = new Properties();
25 props.put("bootstrap.servers", broker);
26 props.put("client.id", clientId); // Nur zur Wiedererkennung
27 props.put("key.serializer", StringSerializer.class.getName());
28 props.put("value.serializer", StringSerializer.class.getName());
30 producer = new KafkaProducer<>(props);
44 send(Long.toString(i%10), Long.toString(i));
48 catch (InterruptedException e) {}
51 log.info("{}: Closing the KafkaProducer", id);
53 log.info("{}: Produced {} messages in total, exiting!", id, produced);
58 void send(String key, String value)
60 final long time = System.currentTimeMillis();
63 // 1. ProducerRecord<String, String> erzeugen
64 // 2. Record mit producer.send(...) verschicken
66 // * Callback implementieren und Erfolg/Fehler logge
67 // * Tipp: Aktuellen Zeitstempel auch im Callback erzeugen und
68 // Millisekunden berechnen, die bis zum Senden verstrichen sind:
69 // long now = System.currentTimeMillis(); // Für: now - time
71 long now = System.currentTimeMillis();
73 "{} - Queued #{} key={} latency={}ms",
82 public static void main(String[] args) throws Exception
84 String broker = ":9092";
85 String topic = "test";
86 String clientId = "DEV";
98 SimpleProducer instance = new SimpleProducer(broker, topic, clientId);
100 Runtime.getRuntime().addShutdownHook(new Thread(() ->
102 instance.running = false;
103 while (!instance.done)
105 log.info("Waiting for main-thread...");
110 catch (InterruptedException e) {}
112 log.info("Shutdown completed.");
116 "Running SimpleProducer: broker={}, topic={}, client-id={}",