Reorganisierten Code aus first-contact gemerged
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessProducer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.KafkaProducer;
5 import org.apache.kafka.clients.producer.ProducerRecord;
6 import org.apache.kafka.common.serialization.StringSerializer;
7
8 import javax.annotation.PreDestroy;
9 import java.util.Properties;
10 import java.util.concurrent.ExecutionException;
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.Future;
13
14
15 @Slf4j
16 public class EndlessProducer implements Runnable
17 {
18   private final ExecutorService executor;
19   private final String id;
20   private final String topic;
21   private final int throttleMs;
22   private final KafkaProducer<String, String> producer;
23
24   private boolean running = false;
25   private long i = 0;
26   private long produced = 0;
27   private Future<?> future = null;
28
29   public EndlessProducer(
30       ExecutorService executor,
31       String bootstrapServer,
32       String clientId,
33       String topic,
34       String acks,
35       int throttleMs)
36   {
37     this.executor = executor;
38     this.id = clientId;
39     this.topic = topic;
40     this.throttleMs = throttleMs;
41
42     Properties props = new Properties();
43     props.put("bootstrap.servers", bootstrapServer);
44     props.put("client.id", clientId);
45     props.put("acks", acks);
46     props.put("key.serializer", StringSerializer.class.getName());
47     props.put("value.serializer", StringSerializer.class.getName());
48
49     this.producer = new KafkaProducer<>(props);
50   }
51
52   @Override
53   public void run()
54   {
55     try
56     {
57       for (; running; i++)
58       {
59         send(Long.toString(i%10), Long.toString(i));
60
61         if (throttleMs > 0)
62         {
63           try
64           {
65             Thread.sleep(throttleMs);
66           }
67           catch (InterruptedException e)
68           {
69             log.warn("{} - Interrupted while throttling!", e);
70           }
71         }
72       }
73
74       log.info("{} - Done", id);
75     }
76     catch (Exception e)
77     {
78
79     }
80   }
81
82   void send(String key, String value)
83   {
84     final long time = System.currentTimeMillis();
85
86     final ProducerRecord<String, String> record = new ProducerRecord<>(
87         "test", // Topic
88         key,    // Key
89         value   // Value
90     );
91
92     producer.send(record, (metadata, e) ->
93     {
94       long now = System.currentTimeMillis();
95       if (e == null)
96       {
97         // HANDLE SUCCESS
98         produced++;
99         log.debug(
100             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
101             id,
102             record.key(),
103             record.value(),
104             metadata.partition(),
105             metadata.offset(),
106             metadata.timestamp(),
107             now - time
108         );
109       }
110       else
111       {
112         // HANDLE ERROR
113         log.error(
114             "{} - ERROR key={} timestamp={} latency={}ms: {}",
115             id,
116             record.key(),
117             metadata == null ? -1 : metadata.timestamp(),
118             now - time,
119             e.toString()
120         );
121       }
122     });
123
124     long now = System.currentTimeMillis();
125     log.trace(
126         "{} - Queued #{} key={} latency={}ms",
127         id,
128         value,
129         record.key(),
130         now - time
131     );
132   }
133
134   public synchronized void start()
135   {
136     if (running)
137       throw new IllegalStateException("Producer instance " + id + " is already running!");
138
139     log.info("{} - Starting - produced {} messages before", id, produced);
140     running = true;
141     future = executor.submit(this);
142   }
143
144   public synchronized void stop() throws ExecutionException, InterruptedException
145   {
146     if (!running)
147       throw new IllegalStateException("Producer instance " + id + " is not running!");
148
149     log.info("{} - Stopping...", id);
150     running = false;
151     future.get();
152     log.info("{} - Stopped - produced {} messages so far", id, produced);
153   }
154
155   @PreDestroy
156   public void destroy() throws ExecutionException, InterruptedException
157   {
158     log.info("{} - Destroy!", id);
159     try
160     {
161       stop();
162     }
163     catch (IllegalStateException e)
164     {
165       log.info("{} - Was already stopped", id);
166     }
167     finally
168     {
169       log.info("{} - Closing the KafkaProducer", id);
170       producer.close();
171       log.info("{}: Produced {} messages in total, exiting!", id, produced);
172     }
173   }
174 }