Round Robin: the producer uses round-robin for partitioning
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessProducer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.KafkaProducer;
5 import org.apache.kafka.clients.producer.ProducerRecord;
6 import org.apache.kafka.common.serialization.StringSerializer;
7
8 import javax.annotation.PreDestroy;
9 import java.util.Properties;
10 import java.util.concurrent.ExecutionException;
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.Future;
13
14
15 @Slf4j
16 public class EndlessProducer implements Runnable
17 {
18   private final ExecutorService executor;
19   private final String id;
20   private final String topic;
21   private final String acks;
22   private final int throttleMs;
23   private final KafkaProducer<String, String> producer;
24
25   private boolean running = false;
26   private long i = 0;
27   private long produced = 0;
28   private Future<?> future = null;
29
30   public EndlessProducer(
31       ExecutorService executor,
32       String bootstrapServer,
33       String clientId,
34       String topic,
35       String acks,
36       int throttleMs)
37   {
38     this.executor = executor;
39     this.id = clientId;
40     this.topic = topic;
41     this.acks = acks;
42     this.throttleMs = throttleMs;
43
44     Properties props = new Properties();
45     props.put("bootstrap.servers", bootstrapServer);
46     props.put("client.id", clientId);
47     props.put("acks", acks);
48     props.put("partitioner.class", "org.apache.kafka.clients.producer.RoundRobinPartitioner");
49     props.put("key.serializer", StringSerializer.class.getName());
50     props.put("value.serializer", StringSerializer.class.getName());
51
52     this.producer = new KafkaProducer<>(props);
53   }
54
55   @Override
56   public void run()
57   {
58     try
59     {
60       for (; running; i++)
61       {
62         final long time = System.currentTimeMillis();
63
64         final ProducerRecord<String, String> record = new ProducerRecord<>(
65             topic,                 // Topic
66             Long.toString(i % 10), // Key
67             Long.toString(i)       // Value
68         );
69
70         producer.send(record, (metadata, e) ->
71         {
72           long now = System.currentTimeMillis();
73           if (e == null)
74           {
75             // HANDLE SUCCESS
76             produced++;
77             log.debug(
78                 "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
79                 id,
80                 record.key(),
81                 record.value(),
82                 metadata.partition(),
83                 metadata.offset(),
84                 metadata.timestamp(),
85                 now - time
86             );
87           }
88           else
89           {
90             // HANDLE ERROR
91             log.error(
92                 "{} - ERROR key={} timestamp={} latency={}ms: {}",
93                 id,
94                 record.key(),
95                 metadata == null ? -1 : metadata.timestamp(),
96                 now - time,
97                 e.toString()
98             );
99           }
100         });
101
102         long now = System.currentTimeMillis();
103         log.trace(
104             "{} - Queued #{} key={} latency={}ms",
105             id,
106             i,
107             record.key(),
108             now - time
109         );
110
111         if (throttleMs > 0)
112         {
113           try
114           {
115             Thread.sleep(throttleMs);
116           }
117           catch (InterruptedException e)
118           {
119             log.warn("{} - Interrupted while throttling!", e);
120           }
121         }
122       }
123
124       log.info("{} - Done", id);
125     }
126     catch (Exception e)
127     {
128
129     }
130   }
131
132   public synchronized void start()
133   {
134     if (running)
135       throw new IllegalStateException("Producer instance " + id + " is already running!");
136
137     log.info("{} - Starting - produced {} messages before", id, produced);
138     running = true;
139     future = executor.submit(this);
140   }
141
142   public synchronized void stop() throws ExecutionException, InterruptedException
143   {
144     if (!running)
145       throw new IllegalStateException("Producer instance " + id + " is not running!");
146
147     log.info("{} - Stopping...", id);
148     running = false;
149     future.get();
150     log.info("{} - Stopped - produced {} messages so far", id, produced);
151   }
152
153   @PreDestroy
154   public void destroy() throws ExecutionException, InterruptedException
155   {
156     log.info("{} - Destroy!", id);
157     try
158     {
159       stop();
160     }
161     catch (IllegalStateException e)
162     {
163       log.info("{} - Was already stopped", id);
164     }
165     finally
166     {
167       log.info("{} - Closing the KafkaProducer", id);
168       producer.close();
169       log.info("{}: Produced {} messages in total, exiting!", id, produced);
170     }
171   }
172 }