df5ff018f53712f53ffa78f9bcf321b9940cc72e
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessProducer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.KafkaProducer;
5 import org.apache.kafka.clients.producer.ProducerRecord;
6 import org.apache.kafka.common.serialization.StringSerializer;
7
8 import javax.annotation.PreDestroy;
9 import java.util.Properties;
10 import java.util.concurrent.ExecutionException;
11 import java.util.concurrent.ExecutorService;
12
13
14 @Slf4j
15 public class EndlessProducer implements Runnable
16 {
17   private final ExecutorService executor;
18   private final String id;
19   private final String topic;
20   private final int throttleMs;
21   private final KafkaProducer<String, String> producer;
22
23   private boolean running = false;
24   private long i = 0;
25   private long produced = 0;
26
27   public EndlessProducer(
28       ExecutorService executor,
29       String bootstrapServer,
30       String clientId,
31       String topic,
32       String acks,
33       int throttleMs)
34   {
35     this.executor = executor;
36     this.id = clientId;
37     this.topic = topic;
38     this.throttleMs = throttleMs;
39
40     Properties props = new Properties();
41     props.put("bootstrap.servers", bootstrapServer);
42     props.put("client.id", clientId);
43     props.put("acks", acks);
44     props.put("key.serializer", StringSerializer.class.getName());
45     props.put("value.serializer", StringSerializer.class.getName());
46
47     this.producer = new KafkaProducer<>(props);
48   }
49
50   @Override
51   public void run()
52   {
53     try
54     {
55       for (; running; i++)
56       {
57         send(Long.toString(i%10), Long.toString(i));
58
59         if (throttleMs > 0)
60         {
61           try
62           {
63             Thread.sleep(throttleMs);
64           }
65           catch (InterruptedException e)
66           {
67             log.warn("{} - Interrupted while throttling!", e);
68           }
69         }
70       }
71
72       log.info("{} - Done", id);
73     }
74     catch (Exception e)
75     {
76       log.error("{} - Unexpected Exception:", id, e);
77     }
78     finally
79     {
80       synchronized (this)
81       {
82         running = false;
83         log.info("{} - Stopped - produced {} messages so far", id, produced);
84       }
85     }
86   }
87
88   void send(String key, String value)
89   {
90     // TODO: Die übergebene Nachricht versenden
91   }
92
93   public synchronized void start()
94   {
95     if (running)
96       throw new IllegalStateException("Producer instance " + id + " is already running!");
97
98     log.info("{} - Starting - produced {} messages before", id, produced);
99     running = true;
100     executor.submit(this);
101   }
102
103   public synchronized void stop() throws ExecutionException, InterruptedException
104   {
105     if (!running)
106       throw new IllegalStateException("Producer instance " + id + " is not running!");
107
108     log.info("{} - Stopping...", id);
109     running = false;
110   }
111
112   @PreDestroy
113   public void destroy() throws ExecutionException, InterruptedException
114   {
115     log.info("{} - Destroy!", id);
116     try
117     {
118       stop();
119     }
120     catch (IllegalStateException e)
121     {
122       log.info("{} - Was already stopped", id);
123     }
124     finally
125     {
126       log.info("{} - Closing the KafkaProducer", id);
127       producer.close();
128       log.info("{}: Produced {} messages in total, exiting!", id, produced);
129     }
130   }
131 }