2b5b441c1683e8d774a5a6d16865a94dfa785d18
[demos/kafka/training] / src / main / java / de / juplo / kafka / SimpleProducer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.Producer;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerRecord;
7 import org.apache.kafka.common.serialization.StringSerializer;
8
9 import java.util.Properties;
10 import java.util.concurrent.CompletableFuture;
11
12
13 @Slf4j
14 public class SimpleProducer
15 {
16   private final String id;
17   private final String topic;
18   private final Producer<String, String> producer;
19
20   private long produced = 0;
21   private volatile boolean running = true;
22   private volatile boolean done = false;
23
24   public SimpleProducer(String broker, String topic, String clientId)
25   {
26     Properties props = new Properties();
27     props.put("bootstrap.servers", broker);
28     props.put("client.id", clientId); // Nur zur Wiedererkennung
29     props.put("transactional.id", clientId); // Aktiviert außerdem enable.idempotence=true
30     props.put("key.serializer", StringSerializer.class.getName());
31     props.put("value.serializer", StringSerializer.class.getName());
32
33     this.id = clientId;
34     this.topic = topic;
35     producer = new KafkaProducer<>(props);
36   }
37
38   public void run()
39   {
40     long i = 0;
41
42     try
43     {
44       for (; running ; i++)
45       {
46         send(Long.toString(i%10), Long.toString(i));
47         Thread.sleep(500);
48       }
49     }
50     catch (InterruptedException e) {}
51     finally
52     {
53       log.info("{}: Closing the KafkaProducer", id);
54       producer.close();
55       log.info("{}: Produced {} messages in total, exiting!", id, produced);
56       done = true;
57     }
58   }
59
60   CompletableFuture<Long> send(String key, String value)
61   {
62     final CompletableFuture<Long> result = new CompletableFuture<>();
63     final long time = System.currentTimeMillis();
64
65     final ProducerRecord<String, String> record = new ProducerRecord<>(
66         topic,  // Topic
67         key,    // Key
68         value   // Value
69     );
70
71     producer.send(record, (metadata, e) ->
72     {
73       long now = System.currentTimeMillis();
74       if (e == null)
75       {
76         // HANDLE SUCCESS
77         produced++;
78         log.debug(
79             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
80             id,
81             record.key(),
82             record.value(),
83             metadata.partition(),
84             metadata.offset(),
85             metadata.timestamp(),
86             now - time
87         );
88         result.complete(metadata.offset());
89       }
90       else
91       {
92         // HANDLE ERROR
93         log.error(
94             "{} - ERROR key={} timestamp={} latency={}ms: {}",
95             id,
96             record.key(),
97             metadata == null ? -1 : metadata.timestamp(),
98             now - time,
99             e.toString()
100         );
101         result.completeExceptionally(e);
102       }
103     });
104
105     long now = System.currentTimeMillis();
106     log.trace(
107         "{} - Queued #{} key={} latency={}ms",
108         id,
109         value,
110         record.key(),
111         now - time
112     );
113
114     return result;
115   }
116
117
118   public static void main(String[] args) throws Exception
119   {
120     String broker = ":9092";
121     String topic = "test";
122     String clientId = "DEV";
123
124     switch (args.length)
125     {
126       case 3:
127         clientId = args[2];
128       case 2:
129         topic = args[1];
130       case 1:
131         broker = args[0];
132     }
133
134     SimpleProducer instance = new SimpleProducer(broker, topic, clientId);
135
136     Runtime.getRuntime().addShutdownHook(new Thread(() ->
137     {
138       instance.running = false;
139       while (!instance.done)
140       {
141         log.info("Waiting for main-thread...");
142         try
143         {
144           Thread.sleep(1000);
145         }
146         catch (InterruptedException e) {}
147       }
148       log.info("Shutdown completed.");
149     }));
150
151     log.info(
152         "Running SimpleProducer: broker={}, topic={}, client-id={}",
153         broker,
154         topic,
155         clientId);
156     instance.run();
157   }
158 }