Reorganisierten Code aus first-contact gemerged
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessProducer.java
index 357e07a..cc3150e 100644 (file)
@@ -5,54 +5,77 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
 
+import javax.annotation.PreDestroy;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 
 @Slf4j
-public class SimpleProducer
+public class EndlessProducer implements Runnable
 {
+  private final ExecutorService executor;
   private final String id;
   private final String topic;
+  private final int throttleMs;
   private final KafkaProducer<String, String> producer;
 
+  private boolean running = false;
+  private long i = 0;
   private long produced = 0;
-
-  public SimpleProducer(String clientId, String topic)
+  private Future<?> future = null;
+
+  public EndlessProducer(
+      ExecutorService executor,
+      String bootstrapServer,
+      String clientId,
+      String topic,
+      String acks,
+      int throttleMs)
   {
-    // tag::create[]
+    this.executor = executor;
+    this.id = clientId;
+    this.topic = topic;
+    this.throttleMs = throttleMs;
+
     Properties props = new Properties();
-    props.put("bootstrap.servers", "localhost:9092");
+    props.put("bootstrap.servers", bootstrapServer);
+    props.put("client.id", clientId);
+    props.put("acks", acks);
     props.put("key.serializer", StringSerializer.class.getName());
     props.put("value.serializer", StringSerializer.class.getName());
 
-    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
-    // end::create[]
-
-    this.id = clientId;
-    this.topic = topic;
-    this.producer = producer;
+    this.producer = new KafkaProducer<>(props);
   }
 
+  @Override
   public void run()
   {
-    long i = 0;
-
     try
     {
-      for (; i < 100 ; i++)
+      for (; running; i++)
       {
         send(Long.toString(i%10), Long.toString(i));
+
+        if (throttleMs > 0)
+        {
+          try
+          {
+            Thread.sleep(throttleMs);
+          }
+          catch (InterruptedException e)
+          {
+            log.warn("{} - Interrupted while throttling!", e);
+          }
+        }
       }
 
       log.info("{} - Done", id);
     }
-    finally
+    catch (Exception e)
     {
-      log.info("{}: Closing the KafkaProducer", id);
-      producer.close();
-      log.info("{}: Produced {} messages in total, exiting!", id, produced);
+
     }
   }
 
@@ -108,10 +131,44 @@ public class SimpleProducer
     );
   }
 
+  public synchronized void start()
+  {
+    if (running)
+      throw new IllegalStateException("Producer instance " + id + " is already running!");
+
+    log.info("{} - Starting - produced {} messages before", id, produced);
+    running = true;
+    future = executor.submit(this);
+  }
+
+  public synchronized void stop() throws ExecutionException, InterruptedException
+  {
+    if (!running)
+      throw new IllegalStateException("Producer instance " + id + " is not running!");
+
+    log.info("{} - Stopping...", id);
+    running = false;
+    future.get();
+    log.info("{} - Stopped - produced {} messages so far", id, produced);
+  }
 
-  public static void main(String[] args) throws Exception
+  @PreDestroy
+  public void destroy() throws ExecutionException, InterruptedException
   {
-    SimpleProducer producer = new SimpleProducer("P", "test");
-    producer.run();
+    log.info("{} - Destroy!", id);
+    try
+    {
+      stop();
+    }
+    catch (IllegalStateException e)
+    {
+      log.info("{} - Was already stopped", id);
+    }
+    finally
+    {
+      log.info("{} - Closing the KafkaProducer", id);
+      producer.close();
+      log.info("{}: Produced {} messages in total, exiting!", id, produced);
+    }
   }
 }