X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;ds=sidebyside;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FEndlessProducer.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FEndlessProducer.java;h=a31652b0d78a1eed2ce76d15a3d2ed990d4ab302;hb=8dfb868399595061e62bc929e0bf98e3420d0086;hp=7a5b324e144fddbaef84c036640e62448c6f7a4f;hpb=26cef68053cc7472055344b171a44f34e7592ebb;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/EndlessProducer.java b/src/main/java/de/juplo/kafka/EndlessProducer.java index 7a5b324..a31652b 100644 --- a/src/main/java/de/juplo/kafka/EndlessProducer.java +++ b/src/main/java/de/juplo/kafka/EndlessProducer.java @@ -17,12 +17,14 @@ public class EndlessProducer implements Runnable private final ExecutorService executor; private final String id; private final String topic; + private final int commitIntervalMs; private final int throttleMs; private final KafkaProducer producer; private boolean running = false; private long i = 0; private long produced = 0; + private long lastCommit; public EndlessProducer( ExecutorService executor, @@ -30,15 +32,18 @@ public class EndlessProducer implements Runnable String clientId, String topic, String acks, + int commitIntervalMs, int throttleMs) { this.executor = executor; this.id = clientId; this.topic = topic; + this.commitIntervalMs = commitIntervalMs; this.throttleMs = throttleMs; Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServer); + props.put("transactional.id", clientId); props.put("client.id", clientId); props.put("acks", acks); props.put("key.serializer", StringSerializer.class.getName()); @@ -52,6 +57,12 @@ public class EndlessProducer implements Runnable { try { + producer.initTransactions(); + + lastCommit = System.currentTimeMillis(); + log.info("{} - Beginning transaction", id); + producer.beginTransaction(); + for (; running; i++) { send(Long.toString(i%10), Long.toString(i)); @@ -67,13 +78,27 @@ public class EndlessProducer implements Runnable log.warn("{} - Interrupted while throttling!", e); } } + + long now = System.currentTimeMillis(); + if (now - lastCommit >= commitIntervalMs) + { + log.info("{} - Commiting transaction", id); + producer.commitTransaction(); + lastCommit = now; + log.info("{} - Beginning new transaction", id); + producer.beginTransaction(); + } } + log.info("{} - Commiting transaction", id); + producer.commitTransaction(); log.info("{} - Done", id); } catch (Exception e) { log.error("{} - Unexpected Exception:", id, e); + log.info("{} - Aborting transaction", id); + producer.abortTransaction(); } finally {