EndlessProducer arbeitet jetzt transaktional
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessProducer.java
index cc3150e..a31652b 100644 (file)
@@ -9,7 +9,6 @@ import javax.annotation.PreDestroy;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 
 
 @Slf4j
@@ -18,13 +17,14 @@ public class EndlessProducer implements Runnable
   private final ExecutorService executor;
   private final String id;
   private final String topic;
+  private final int commitIntervalMs;
   private final int throttleMs;
   private final KafkaProducer<String, String> producer;
 
   private boolean running = false;
   private long i = 0;
   private long produced = 0;
-  private Future<?> future = null;
+  private long lastCommit;
 
   public EndlessProducer(
       ExecutorService executor,
@@ -32,15 +32,18 @@ public class EndlessProducer implements Runnable
       String clientId,
       String topic,
       String acks,
+      int commitIntervalMs,
       int throttleMs)
   {
     this.executor = executor;
     this.id = clientId;
     this.topic = topic;
+    this.commitIntervalMs = commitIntervalMs;
     this.throttleMs = throttleMs;
 
     Properties props = new Properties();
     props.put("bootstrap.servers", bootstrapServer);
+    props.put("transactional.id", clientId);
     props.put("client.id", clientId);
     props.put("acks", acks);
     props.put("key.serializer", StringSerializer.class.getName());
@@ -54,6 +57,12 @@ public class EndlessProducer implements Runnable
   {
     try
     {
+      producer.initTransactions();
+
+      lastCommit = System.currentTimeMillis();
+      log.info("{} - Beginning transaction", id);
+      producer.beginTransaction();
+
       for (; running; i++)
       {
         send(Long.toString(i%10), Long.toString(i));
@@ -69,13 +78,35 @@ public class EndlessProducer implements Runnable
             log.warn("{} - Interrupted while throttling!", e);
           }
         }
+
+        long now = System.currentTimeMillis();
+        if (now - lastCommit >= commitIntervalMs)
+        {
+          log.info("{} - Commiting transaction", id);
+          producer.commitTransaction();
+          lastCommit = now;
+          log.info("{} - Beginning new transaction", id);
+          producer.beginTransaction();
+        }
       }
 
+      log.info("{} - Commiting transaction", id);
+      producer.commitTransaction();
       log.info("{} - Done", id);
     }
     catch (Exception e)
     {
-
+      log.error("{} - Unexpected Exception:", id, e);
+      log.info("{} - Aborting transaction", id);
+      producer.abortTransaction();
+    }
+    finally
+    {
+      synchronized (this)
+      {
+        running = false;
+        log.info("{} - Stopped - produced {} messages so far", id, produced);
+      }
     }
   }
 
@@ -84,7 +115,7 @@ public class EndlessProducer implements Runnable
     final long time = System.currentTimeMillis();
 
     final ProducerRecord<String, String> record = new ProducerRecord<>(
-        "test", // Topic
+        topic,  // Topic
         key,    // Key
         value   // Value
     );
@@ -138,7 +169,7 @@ public class EndlessProducer implements Runnable
 
     log.info("{} - Starting - produced {} messages before", id, produced);
     running = true;
-    future = executor.submit(this);
+    executor.submit(this);
   }
 
   public synchronized void stop() throws ExecutionException, InterruptedException
@@ -148,8 +179,6 @@ public class EndlessProducer implements Runnable
 
     log.info("{} - Stopping...", id);
     running = false;
-    future.get();
-    log.info("{} - Stopped - produced {} messages so far", id, produced);
   }
 
   @PreDestroy