WIP
[demos/kafka/training] / src / main / java / de / juplo / kafka / SimpleProducer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.Producer;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerRecord;
7 import org.apache.kafka.common.serialization.StringSerializer;
8
9 import java.util.Properties;
10 import java.util.concurrent.CompletableFuture;
11
12
13 @Slf4j
14 public class SimpleProducer
15 {
16   private final String id;
17   private final String topic;
18   private final Producer<String, String> producer;
19
20   private long produced = 0;
21   private volatile boolean running = true;
22   private volatile boolean done = false;
23
24   public SimpleProducer(String broker, String topic, String clientId)
25   {
26     Properties props = new Properties();
27     props.put("bootstrap.servers", broker);
28     props.put("client.id", clientId); // Nur zur Wiedererkennung
29     props.put("transactional.id", clientId); // Aktiviert außerdem enable.idempotence=true
30     props.put("key.serializer", StringSerializer.class.getName());
31     props.put("value.serializer", StringSerializer.class.getName());
32
33     this.id = clientId;
34     this.topic = topic;
35     producer = new KafkaProducer<>(props);
36     producer.initTransactions();
37   }
38
39   public void run()
40   {
41     long i = 0;
42
43     try
44     {
45       for (; running ; i++)
46       {
47         send(Long.toString(i%10), Long.toString(i));
48         Thread.sleep(500);
49       }
50     }
51     catch (InterruptedException e) {}
52     finally
53     {
54       log.info("{}: Closing the KafkaProducer", id);
55       producer.close();
56       log.info("{}: Produced {} messages in total, exiting!", id, produced);
57       done = true;
58     }
59   }
60
61   CompletableFuture<Long> send(String key, String value)
62   {
63     final CompletableFuture<Long> result = new CompletableFuture<>();
64     final long time = System.currentTimeMillis();
65
66     final ProducerRecord<String, String> record = new ProducerRecord<>(
67         topic,  // Topic
68         key,    // Key
69         value   // Value
70     );
71
72     producer.send(record, (metadata, e) ->
73     {
74       long now = System.currentTimeMillis();
75       if (e == null)
76       {
77         // HANDLE SUCCESS
78         produced++;
79         log.debug(
80             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
81             id,
82             record.key(),
83             record.value(),
84             metadata.partition(),
85             metadata.offset(),
86             metadata.timestamp(),
87             now - time
88         );
89         result.complete(metadata.offset());
90       }
91       else
92       {
93         // HANDLE ERROR
94         log.error(
95             "{} - ERROR key={} timestamp={} latency={}ms: {}",
96             id,
97             record.key(),
98             metadata == null ? -1 : metadata.timestamp(),
99             now - time,
100             e.toString()
101         );
102         result.completeExceptionally(e);
103       }
104     });
105
106     long now = System.currentTimeMillis();
107     log.trace(
108         "{} - Queued #{} key={} latency={}ms",
109         id,
110         value,
111         record.key(),
112         now - time
113     );
114
115     return result;
116   }
117
118
119   public static void main(String[] args) throws Exception
120   {
121     String broker = ":9092";
122     String topic = "test";
123     String clientId = "DEV";
124
125     switch (args.length)
126     {
127       case 3:
128         clientId = args[2];
129       case 2:
130         topic = args[1];
131       case 1:
132         broker = args[0];
133     }
134
135     SimpleProducer instance = new SimpleProducer(broker, topic, clientId);
136
137     Runtime.getRuntime().addShutdownHook(new Thread(() ->
138     {
139       instance.running = false;
140       while (!instance.done)
141       {
142         log.info("Waiting for main-thread...");
143         try
144         {
145           Thread.sleep(1000);
146         }
147         catch (InterruptedException e) {}
148       }
149       log.info("Shutdown completed.");
150     }));
151
152     log.info(
153         "Running SimpleProducer: broker={}, topic={}, client-id={}",
154         broker,
155         topic,
156         clientId);
157     instance.run();
158   }
159 }