Spring-Version des Endless-Stream-Producer's
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessProducer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.ProducerRecord;
5 import org.apache.kafka.clients.producer.RecordMetadata;
6 import org.springframework.kafka.core.KafkaTemplate;
7 import org.springframework.kafka.support.SendResult;
8 import org.springframework.util.concurrent.ListenableFuture;
9
10 import javax.annotation.PreDestroy;
11 import java.util.concurrent.ExecutionException;
12 import java.util.concurrent.ExecutorService;
13
14
15 @Slf4j
16 public class EndlessProducer implements Runnable
17 {
18   private final ExecutorService executor;
19   private final String id;
20   private final int throttleMs;
21   private final KafkaTemplate<String, String> kafkaTemplate;
22
23   private boolean running = false;
24   private long i = 0;
25   private long produced = 0;
26
27   public EndlessProducer(
28       ExecutorService executor,
29       String clientId,
30       int throttleMs,
31       KafkaTemplate<String, String> kafkaTemplate)
32   {
33     this.executor = executor;
34     this.id = clientId;
35     this.throttleMs = throttleMs;
36     this.kafkaTemplate = kafkaTemplate;
37   }
38
39   @Override
40   public void run()
41   {
42     try
43     {
44       for (; running; i++)
45       {
46         send(Long.toString(i%10), Long.toString(i));
47
48         if (throttleMs > 0)
49         {
50           try
51           {
52             Thread.sleep(throttleMs);
53           }
54           catch (InterruptedException e)
55           {
56             log.warn("{} - Interrupted while throttling!", e);
57           }
58         }
59       }
60
61       log.info("{} - Done", id);
62     }
63     catch (Exception e)
64     {
65       log.error("{} - Unexpected Exception:", id, e);
66     }
67     finally
68     {
69       synchronized (this)
70       {
71         running = false;
72         log.info("{} - Stopped - produced {} messages so far", id, produced);
73       }
74     }
75   }
76
77   void send(String key, String value)
78   {
79     final long time = System.currentTimeMillis();
80
81     ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.sendDefault(key, value);
82     listenableFuture.addCallback(
83       result ->
84       {
85         long now = System.currentTimeMillis();
86         RecordMetadata metadata = result.getRecordMetadata();
87         ProducerRecord<String, String> record = result.getProducerRecord();
88
89         // HANDLE SUCCESS
90         produced++;
91         log.debug(
92             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
93             id,
94             record.key(),
95             record.value(),
96             metadata.partition(),
97             metadata.offset(),
98             metadata.timestamp(),
99             now - time
100         );
101       },
102       e ->
103       {
104         long now = System.currentTimeMillis();
105
106         // HANDLE ERROR
107         log.error(
108             "{} - ERROR key={} latency={}ms: {}",
109             id,
110             key,
111             now - time,
112             e.toString()
113         );
114       });
115
116     long now = System.currentTimeMillis();
117     log.trace(
118         "{} - Queued #{} key={} latency={}ms",
119         id,
120         value,
121         key,
122         now - time
123     );
124   }
125
126   public synchronized void start()
127   {
128     if (running)
129       throw new IllegalStateException("Producer instance " + id + " is already running!");
130
131     log.info("{} - Starting - produced {} messages before", id, produced);
132     running = true;
133     executor.submit(this);
134   }
135
136   public synchronized void stop() throws ExecutionException, InterruptedException
137   {
138     if (!running)
139       throw new IllegalStateException("Producer instance " + id + " is not running!");
140
141     log.info("{} - Stopping...", id);
142     running = false;
143   }
144
145   @PreDestroy
146   public void destroy() throws ExecutionException, InterruptedException
147   {
148     log.info("{} - Destroy!", id);
149     try
150     {
151       stop();
152     }
153     catch (IllegalStateException e)
154     {
155       log.info("{} - Was already stopped", id);
156     }
157     finally
158     {
159       log.info("{}: Produced {} messages in total, exiting!", id, produced);
160     }
161   }
162 }