Version des `EndlessProducer`, der Nachrichten vom Typ `Long` schreibt
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessProducer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.KafkaProducer;
5 import org.apache.kafka.clients.producer.ProducerRecord;
6 import org.apache.kafka.common.serialization.LongSerializer;
7 import org.apache.kafka.common.serialization.StringSerializer;
8
9 import javax.annotation.PreDestroy;
10 import java.util.Properties;
11 import java.util.concurrent.ExecutionException;
12 import java.util.concurrent.ExecutorService;
13
14
15 @Slf4j
16 public class EndlessProducer implements Runnable
17 {
18   private final ExecutorService executor;
19   private final String id;
20   private final String topic;
21   private final int throttleMs;
22   private final KafkaProducer<String, Long> producer;
23
24   private boolean running = false;
25   private long i = 0;
26   private long produced = 0;
27
28   public EndlessProducer(
29       ExecutorService executor,
30       String bootstrapServer,
31       String clientId,
32       String topic,
33       String acks,
34       int throttleMs)
35   {
36     this.executor = executor;
37     this.id = clientId;
38     this.topic = topic;
39     this.throttleMs = throttleMs;
40
41     Properties props = new Properties();
42     props.put("bootstrap.servers", bootstrapServer);
43     props.put("client.id", clientId);
44     props.put("acks", acks);
45     props.put("key.serializer", StringSerializer.class.getName());
46     props.put("value.serializer", LongSerializer.class.getName());
47
48     this.producer = new KafkaProducer<>(props);
49   }
50
51   @Override
52   public void run()
53   {
54     try
55     {
56       for (; running; i++)
57       {
58         send(Long.toString(i%10), i);
59
60         if (throttleMs > 0)
61         {
62           try
63           {
64             Thread.sleep(throttleMs);
65           }
66           catch (InterruptedException e)
67           {
68             log.warn("{} - Interrupted while throttling!", e);
69           }
70         }
71       }
72
73       log.info("{} - Done", id);
74     }
75     catch (Exception e)
76     {
77       log.error("{} - Unexpected Exception:", id, e);
78     }
79     finally
80     {
81       synchronized (this)
82       {
83         running = false;
84         log.info("{} - Stopped - produced {} messages so far", id, produced);
85       }
86     }
87   }
88
89   void send(String key, Long value)
90   {
91     final long time = System.currentTimeMillis();
92
93     final ProducerRecord<String, Long> record = new ProducerRecord<>(
94         topic,  // Topic
95         key,    // Key
96         value   // Value
97     );
98
99     producer.send(record, (metadata, e) ->
100     {
101       long now = System.currentTimeMillis();
102       if (e == null)
103       {
104         // HANDLE SUCCESS
105         produced++;
106         log.debug(
107             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
108             id,
109             record.key(),
110             record.value(),
111             metadata.partition(),
112             metadata.offset(),
113             metadata.timestamp(),
114             now - time
115         );
116       }
117       else
118       {
119         // HANDLE ERROR
120         log.error(
121             "{} - ERROR key={} timestamp={} latency={}ms: {}",
122             id,
123             record.key(),
124             metadata == null ? -1 : metadata.timestamp(),
125             now - time,
126             e.toString()
127         );
128       }
129     });
130
131     long now = System.currentTimeMillis();
132     log.trace(
133         "{} - Queued #{} key={} latency={}ms",
134         id,
135         value,
136         record.key(),
137         now - time
138     );
139   }
140
141   public synchronized void start()
142   {
143     if (running)
144       throw new IllegalStateException("Producer instance " + id + " is already running!");
145
146     log.info("{} - Starting - produced {} messages before", id, produced);
147     running = true;
148     executor.submit(this);
149   }
150
151   public synchronized void stop() throws ExecutionException, InterruptedException
152   {
153     if (!running)
154       throw new IllegalStateException("Producer instance " + id + " is not running!");
155
156     log.info("{} - Stopping...", id);
157     running = false;
158   }
159
160   @PreDestroy
161   public void destroy() throws ExecutionException, InterruptedException
162   {
163     log.info("{} - Destroy!", id);
164     try
165     {
166       stop();
167     }
168     catch (IllegalStateException e)
169     {
170       log.info("{} - Was already stopped", id);
171     }
172     finally
173     {
174       log.info("{} - Closing the KafkaProducer", id);
175       producer.close();
176       log.info("{}: Produced {} messages in total, exiting!", id, produced);
177     }
178   }
179 }