1 package de.juplo.kafka;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.Producer;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerRecord;
7 import org.apache.kafka.common.serialization.StringSerializer;
9 import java.util.Properties;
13 public class SimpleProducer
15 private final String id;
16 private final String topic;
17 private final Producer<String, String> producer;
19 private long produced = 0;
21 public SimpleProducer(String clientId, String topic)
23 Properties props = new Properties();
24 // TODO: Konfiguration für Producer zusammenstellen
28 this.producer = null; // TODO: Eine Instanz von KafkaProducer erzeugen
39 send(Long.toString(i%10), Long.toString(i));
42 log.info("{} - Done", id);
46 log.info("{}: Closing the KafkaProducer", id);
48 log.info("{}: Produced {} messages in total, exiting!", id, produced);
52 void send(String key, String value)
54 final long time = System.currentTimeMillis();
56 final ProducerRecord<String, String> record = null; // TODO:
57 // Instanz von ProducerRecord mit key & value aus den Parametern
58 // der Methode erzeugen und diesen an das Topic aus dem Attribut
59 // this.topic versenden.
60 // Dabei den Callback für die Ausgabe von Erfolg/Fehlern nutzen.
61 // Für ganz Schnelle: Versanddauer im Callback messen und ausgeben.
63 long now = System.currentTimeMillis();
65 "{} - Queued #{} key={} latency={}ms",
74 public static void main(String[] args) throws Exception
76 SimpleProducer producer = new SimpleProducer("P", "test");