WIP
[demos/kafka/training] / src / main / java / de / juplo / kafka / SimpleProducer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.Producer;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerRecord;
7 import org.apache.kafka.common.serialization.StringSerializer;
8
9 import java.util.Properties;
10 import java.util.concurrent.CompletableFuture;
11
12
13 @Slf4j
14 public class SimpleProducer
15 {
16   private final String id;
17   private final String topic;
18   private final Producer<String, String> producer;
19
20   private long produced = 0;
21   private volatile boolean running = true;
22   private volatile boolean done = false;
23
24   public SimpleProducer(String broker, String topic, String clientId)
25   {
26     Properties props = new Properties();
27     props.put("bootstrap.servers", broker);
28     props.put("client.id", clientId); // Nur zur Wiedererkennung
29     props.put("transactional.id", clientId); // Aktiviert außerdem enable.idempotence=true
30     props.put("key.serializer", StringSerializer.class.getName());
31     props.put("value.serializer", StringSerializer.class.getName());
32
33     this.id = clientId;
34     this.topic = topic;
35     producer = new KafkaProducer<>(props);
36     producer.initTransactions();
37   }
38
39   public void run()
40   {
41     long i = 0;
42
43     try
44     {
45       for (; running ; i++)
46       {
47         send(Long.toString(i%10), Long.toString(i));
48         Thread.sleep(500);
49       }
50     }
51     catch (InterruptedException e) {}
52     finally
53     {
54       log.info("{}: Closing the KafkaProducer", id);
55       producer.close();
56       log.info("{}: Produced {} messages in total, exiting!", id, produced);
57       done = true;
58     }
59   }
60
61   void begin()
62   {
63     producer.beginTransaction();
64   }
65
66   void commit()
67   {
68     producer.commitTransaction();
69   }
70
71   CompletableFuture<Long> send(String key, String value)
72   {
73     final CompletableFuture<Long> result = new CompletableFuture<>();
74     final long time = System.currentTimeMillis();
75
76     final ProducerRecord<String, String> record = new ProducerRecord<>(
77         topic,  // Topic
78         key,    // Key
79         value   // Value
80     );
81
82     producer.send(record, (metadata, e) ->
83     {
84       long now = System.currentTimeMillis();
85       if (e == null)
86       {
87         // HANDLE SUCCESS
88         produced++;
89         log.debug(
90             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
91             id,
92             record.key(),
93             record.value(),
94             metadata.partition(),
95             metadata.offset(),
96             metadata.timestamp(),
97             now - time
98         );
99         result.complete(metadata.offset());
100       }
101       else
102       {
103         // HANDLE ERROR
104         log.error(
105             "{} - ERROR key={} timestamp={} latency={}ms: {}",
106             id,
107             record.key(),
108             metadata == null ? -1 : metadata.timestamp(),
109             now - time,
110             e.toString()
111         );
112         result.completeExceptionally(e);
113       }
114     });
115
116     long now = System.currentTimeMillis();
117     log.trace(
118         "{} - Queued #{} key={} latency={}ms",
119         id,
120         value,
121         record.key(),
122         now - time
123     );
124
125     return result;
126   }
127
128
129   public static void main(String[] args) throws Exception
130   {
131     String broker = ":9092";
132     String topic = "test";
133     String clientId = "DEV";
134
135     switch (args.length)
136     {
137       case 3:
138         clientId = args[2];
139       case 2:
140         topic = args[1];
141       case 1:
142         broker = args[0];
143     }
144
145     SimpleProducer instance = new SimpleProducer(broker, topic, clientId);
146
147     Runtime.getRuntime().addShutdownHook(new Thread(() ->
148     {
149       instance.running = false;
150       while (!instance.done)
151       {
152         log.info("Waiting for main-thread...");
153         try
154         {
155           Thread.sleep(1000);
156         }
157         catch (InterruptedException e) {}
158       }
159       log.info("Shutdown completed.");
160     }));
161
162     log.info(
163         "Running SimpleProducer: broker={}, topic={}, client-id={}",
164         broker,
165         topic,
166         clientId);
167     instance.run();
168   }
169 }