1 package de.juplo.kafka;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.Producer;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerRecord;
7 import org.apache.kafka.common.serialization.StringSerializer;
9 import java.util.Properties;
13 public class SimpleProducer
15 private final String id;
16 private final String topic;
17 private final Producer<String, String> producer;
19 private long produced = 0;
20 private volatile boolean running = true;
21 private volatile boolean done = false;
23 public SimpleProducer(String broker, String topic, String clientId)
25 Properties props = new Properties();
26 // TODO: Konfiguration für den Producer zusammenstellen
30 producer = new KafkaProducer<>(props);
41 send(Long.toString(i%10), Long.toString(i));
45 catch (InterruptedException e) {}
48 log.info("{}: Closing the KafkaProducer", id);
50 log.info("{}: Produced {} messages in total, exiting!", id, produced);
55 void send(String key, String value)
57 final long time = System.currentTimeMillis();
59 ProducerRecord<String, String> record = null;
62 // 1. ProducerRecord<String, String> erzeugen
63 // 2. Record mit producer.send(...) verschicken
65 // * Callback implementieren und Erfolg/Fehler logge
66 // * Tipp: Aktuellen Zeitstempel auch im Callback erzeugen und
67 // Millisekunden berechnen, die bis zum Senden verstrichen sind:
68 // long now = System.currentTimeMillis(); // Für: now - time
70 long now = System.currentTimeMillis();
72 "{} - Queued #{} key={} latency={}ms",
81 public static void main(String[] args) throws Exception
83 String broker = ":9092";
84 String topic = "test";
85 String clientId = "DEV";
97 SimpleProducer instance = new SimpleProducer(broker, topic, clientId);
99 Runtime.getRuntime().addShutdownHook(new Thread(() ->
101 instance.running = false;
102 while (!instance.done)
104 log.info("Waiting for main-thread...");
109 catch (InterruptedException e) {}
111 log.info("Shutdown completed.");
115 "Running SimpleProducer: broker={}, topic={}, client-id={}",