Vorlage
[demos/kafka/training] / src / main / java / de / juplo / kafka / SimpleProducer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.KafkaProducer;
5 import org.apache.kafka.clients.producer.ProducerRecord;
6 import org.apache.kafka.common.serialization.StringSerializer;
7
8 import java.util.Properties;
9
10
11 @Slf4j
12 public class SimpleProducer
13 {
14   private final String id;
15   private final String topic;
16   private final KafkaProducer<String, String> producer;
17
18   private long produced = 0;
19   private volatile boolean running = true;
20   private volatile boolean done = false;
21
22   public SimpleProducer(String broker, String topic, String clientId)
23   {
24     Properties props = new Properties();
25     props.put("bootstrap.servers", broker);
26     props.put("client.id", clientId); // Nur zur Wiedererkennung
27     props.put("key.serializer", StringSerializer.class.getName());
28     props.put("value.serializer", StringSerializer.class.getName());
29
30     producer = new KafkaProducer<>(props);
31
32     this.topic = topic;
33     this.id = clientId;
34   }
35
36   public void run()
37   {
38     long i = 0;
39
40     try
41     {
42       for (; running ; i++)
43       {
44         send(Long.toString(i%10), Long.toString(i));
45         Thread.sleep(500);
46       }
47     }
48     catch (InterruptedException e) {}
49     finally
50     {
51       log.info("{}: Closing the KafkaProducer", id);
52       producer.close();
53       log.info("{}: Produced {} messages in total, exiting!", id, produced);
54       done = true;
55     }
56   }
57
58   void send(String key, String value)
59   {
60     final long time = System.currentTimeMillis();
61
62     // Aufgabe:
63     // 1. ProducerRecord<String, String> erzeugen
64     // 2. Record mit producer.send(...) verschicken
65     // Dabei beachten:
66     // * Callback implementieren und Erfolg/Fehler logge
67     // * Tipp: Aktuellen Zeitstempel auch im Callback erzeugen und
68     //   Millisekunden berechnen, die bis zum Senden verstrichen sind:
69     //   long now = System.currentTimeMillis(); // Für: now - time
70
71     long now = System.currentTimeMillis();
72     log.trace(
73         "{} - Queued #{} key={} latency={}ms",
74         id,
75         value,
76         record.key(),
77         now - time
78     );
79   }
80
81
82   public static void main(String[] args) throws Exception
83   {
84     String broker = ":9092";
85     String topic = "test";
86     String clientId = "DEV";
87
88     switch (args.length)
89     {
90       case 3:
91         clientId = args[2];
92       case 2:
93         topic = args[1];
94       case 1:
95         broker = args[0];
96     }
97
98     SimpleProducer instance = new SimpleProducer(broker, topic, clientId);
99
100     Runtime.getRuntime().addShutdownHook(new Thread(() ->
101     {
102       instance.running = false;
103       while (!instance.done)
104       {
105         log.info("Waiting for main-thread...");
106         try
107         {
108           Thread.sleep(1000);
109         }
110         catch (InterruptedException e) {}
111       }
112       log.info("Shutdown completed.");
113     }));
114
115     log.info(
116         "Running SimpleProducer: broker={}, topic={}, client-id={}",
117         broker,
118         topic,
119         clientId);
120     instance.run();
121   }
122 }