+
+ long now = System.currentTimeMillis();
+ if (now - lastCommit >= commitIntervalMs)
+ {
+ log.info("{} - Commiting transaction", id);
+ producer.commitTransaction();
+ lastCommit = now;
+ log.info("{} - Beginning new transaction", id);
+ producer.beginTransaction();
+ }