1ebc851b1be42b14d5c2d3552654fc3f362c6747
[demos/kafka/training] / src / main / java / de / juplo / kafka / SimpleProducer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.Producer;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerRecord;
7 import org.apache.kafka.common.serialization.StringSerializer;
8
9 import java.util.Properties;
10
11
12 @Slf4j
13 public class SimpleProducer
14 {
15   private final String id;
16   private final String topic;
17   private final Producer<String, String> producer;
18
19   private long produced = 0;
20
21   public SimpleProducer(String clientId, String topic)
22   {
23     Properties props = new Properties();
24     // TODO: Konfiguration für Producer zusammenstellen
25
26     this.id = clientId;
27     this.topic = topic;
28     this.producer = null; // TODO: Eine Instanz von KafkaProducer erzeugen
29   }
30
31   public void run()
32   {
33     long i = 0;
34
35     try
36     {
37       for (; i < 100 ; i++)
38       {
39         send(Long.toString(i%10), Long.toString(i));
40       }
41
42       log.info("{} - Done", id);
43     }
44     finally
45     {
46       log.info("{}: Closing the KafkaProducer", id);
47       producer.close();
48       log.info("{}: Produced {} messages in total, exiting!", id, produced);
49     }
50   }
51
52   void send(String key, String value)
53   {
54     final long time = System.currentTimeMillis();
55
56     final ProducerRecord<String, String> record = null; // TODO:
57     // Instanz von ProducerRecord mit key & value aus den Parametern
58     // der Methode erzeugen und diesen an das Topic aus dem Attribut
59     // this.topic versenden.
60     // Dabei den Callback für die Ausgabe von Erfolg/Fehlern nutzen.
61     // Für ganz Schnelle: Versanddauer im Callback messen und ausgeben.
62
63     long now = System.currentTimeMillis();
64     log.trace(
65         "{} - Queued #{} key={} latency={}ms",
66         id,
67         value,
68         record.key(),
69         now - time
70     );
71   }
72
73
74   public static void main(String[] args) throws Exception
75   {
76     SimpleProducer producer = new SimpleProducer("P", "test");
77     producer.run();
78   }
79 }