WIP
[demos/kafka/training] / src / main / java / de / juplo / kafka / SimpleProducer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.Producer;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerRecord;
7 import org.apache.kafka.common.serialization.StringSerializer;
8
9 import java.util.Properties;
10 import java.util.concurrent.CompletableFuture;
11
12
13 @Slf4j
14 public class SimpleProducer
15 {
16   private final String id;
17   private final String topic;
18   private final Producer<String, String> producer;
19
20   private long produced = 0;
21   private volatile boolean running = true;
22   private volatile boolean done = false;
23
24   public SimpleProducer(String broker, String topic, String clientId)
25   {
26     this(broker, topic, clientId, null);
27   }
28
29   public SimpleProducer(String broker, String topic, String clientId, String transactionalId)
30   {
31     Properties props = new Properties();
32     props.put("bootstrap.servers", broker);
33     props.put("client.id", clientId); // Nur zur Wiedererkennung
34     if (transactionalId != null)
35       props.put("transactional.id", transactionalId); // Aktiviert außerdem enable.idempotence=true
36     props.put("key.serializer", StringSerializer.class.getName());
37     props.put("value.serializer", StringSerializer.class.getName());
38
39     this.id = clientId;
40     this.topic = topic;
41     producer = new KafkaProducer<>(props);
42     producer.initTransactions();
43   }
44
45   public void run()
46   {
47     long i = 0;
48
49     try
50     {
51       for (; running ; i++)
52       {
53         send(Long.toString(i%10), Long.toString(i));
54         Thread.sleep(500);
55       }
56     }
57     catch (InterruptedException e) {}
58     finally
59     {
60       log.info("{}: Closing the KafkaProducer", id);
61       producer.close();
62       log.info("{}: Produced {} messages in total, exiting!", id, produced);
63       done = true;
64     }
65   }
66
67   void begin()
68   {
69     producer.beginTransaction();
70   }
71
72   void commit()
73   {
74     producer.commitTransaction();
75   }
76
77   CompletableFuture<Long> send(String key, String value)
78   {
79     final CompletableFuture<Long> result = new CompletableFuture<>();
80     final long time = System.currentTimeMillis();
81
82     final ProducerRecord<String, String> record = new ProducerRecord<>(
83         topic,  // Topic
84         key,    // Key
85         value   // Value
86     );
87
88     producer.send(record, (metadata, e) ->
89     {
90       long now = System.currentTimeMillis();
91       if (e == null)
92       {
93         // HANDLE SUCCESS
94         produced++;
95         log.debug(
96             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
97             id,
98             record.key(),
99             record.value(),
100             metadata.partition(),
101             metadata.offset(),
102             metadata.timestamp(),
103             now - time
104         );
105         result.complete(metadata.offset());
106       }
107       else
108       {
109         // HANDLE ERROR
110         log.error(
111             "{} - ERROR key={} timestamp={} latency={}ms: {}",
112             id,
113             record.key(),
114             metadata == null ? -1 : metadata.timestamp(),
115             now - time,
116             e.toString()
117         );
118         result.completeExceptionally(e);
119       }
120     });
121
122     long now = System.currentTimeMillis();
123     log.trace(
124         "{} - Queued #{} key={} latency={}ms",
125         id,
126         value,
127         record.key(),
128         now - time
129     );
130
131     return result;
132   }
133
134
135   public static void main(String[] args) throws Exception
136   {
137     String broker = ":9092";
138     String topic = "test";
139     String clientId = "DEV";
140
141     switch (args.length)
142     {
143       case 3:
144         clientId = args[2];
145       case 2:
146         topic = args[1];
147       case 1:
148         broker = args[0];
149     }
150
151     SimpleProducer instance = new SimpleProducer(broker, topic, clientId);
152
153     Runtime.getRuntime().addShutdownHook(new Thread(() ->
154     {
155       instance.running = false;
156       while (!instance.done)
157       {
158         log.info("Waiting for main-thread...");
159         try
160         {
161           Thread.sleep(1000);
162         }
163         catch (InterruptedException e) {}
164       }
165       log.info("Shutdown completed.");
166     }));
167
168     log.info(
169         "Running SimpleProducer: broker={}, topic={}, client-id={}",
170         broker,
171         topic,
172         clientId);
173     instance.run();
174   }
175 }