EndlessProducer arbeitet jetzt transaktional
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessProducer.java
index 7a5b324..a31652b 100644 (file)
@@ -17,12 +17,14 @@ public class EndlessProducer implements Runnable
   private final ExecutorService executor;
   private final String id;
   private final String topic;
+  private final int commitIntervalMs;
   private final int throttleMs;
   private final KafkaProducer<String, String> producer;
 
   private boolean running = false;
   private long i = 0;
   private long produced = 0;
+  private long lastCommit;
 
   public EndlessProducer(
       ExecutorService executor,
@@ -30,15 +32,18 @@ public class EndlessProducer implements Runnable
       String clientId,
       String topic,
       String acks,
+      int commitIntervalMs,
       int throttleMs)
   {
     this.executor = executor;
     this.id = clientId;
     this.topic = topic;
+    this.commitIntervalMs = commitIntervalMs;
     this.throttleMs = throttleMs;
 
     Properties props = new Properties();
     props.put("bootstrap.servers", bootstrapServer);
+    props.put("transactional.id", clientId);
     props.put("client.id", clientId);
     props.put("acks", acks);
     props.put("key.serializer", StringSerializer.class.getName());
@@ -52,6 +57,12 @@ public class EndlessProducer implements Runnable
   {
     try
     {
+      producer.initTransactions();
+
+      lastCommit = System.currentTimeMillis();
+      log.info("{} - Beginning transaction", id);
+      producer.beginTransaction();
+
       for (; running; i++)
       {
         send(Long.toString(i%10), Long.toString(i));
@@ -67,13 +78,27 @@ public class EndlessProducer implements Runnable
             log.warn("{} - Interrupted while throttling!", e);
           }
         }
+
+        long now = System.currentTimeMillis();
+        if (now - lastCommit >= commitIntervalMs)
+        {
+          log.info("{} - Commiting transaction", id);
+          producer.commitTransaction();
+          lastCommit = now;
+          log.info("{} - Beginning new transaction", id);
+          producer.beginTransaction();
+        }
       }
 
+      log.info("{} - Commiting transaction", id);
+      producer.commitTransaction();
       log.info("{} - Done", id);
     }
     catch (Exception e)
     {
       log.error("{} - Unexpected Exception:", id, e);
+      log.info("{} - Aborting transaction", id);
+      producer.abortTransaction();
     }
     finally
     {