final long time = System.currentTimeMillis();
final ProducerRecord<String, String> record = new ProducerRecord<>(
- topic, // Topic
- key, // Key
- value // Value
+ topic, // Topic
+ key, // Key
+ value // Value
);
producer.send(record, (metadata, e) ->
// HANDLE SUCCESS
produced++;
log.debug(
- "{} - Sent message {}={}, partition={}, offset={}, timestamp={}, latency={}ms",
- id,
- key,
- value,
- metadata.partition(),
- metadata.offset(),
- metadata.timestamp(),
- now - time
+ "{} - Sent message {}={}, partition={}, offset={}, timestamp={}, latency={}ms",
+ id,
+ key,
+ value,
+ metadata.partition(),
+ metadata.offset(),
+ metadata.timestamp(),
+ now - time
);
}
else
{
// HANDLE ERROR
log.error(
- "{} - ERROR for message {}={}, latency={}ms: {}",
- id,
- key,
- value,
- now - time,
- e.toString()
+ "{} - ERROR for message {}={}, latency={}ms: {}",
+ id,
+ key,
+ value,
+ now - time,
+ e.toString()
);
}
});
long now = System.currentTimeMillis();
log.trace(
- "{} - Queued message {}={}, latency={}ms",
- id,
- key,
- value,
- now - time
+ "{} - Queued message {}={}, latency={}ms",
+ id,
+ key,
+ value,
+ now - time
);
}
}));
log.info(
- "Running ExampleProducer: broker={}, topic={}, client-id={}",
- broker,
- topic,
- clientId);
+ "Running ExampleProducer: broker={}, topic={}, client-id={}",
+ broker,
+ topic,
+ clientId);
instance.run();
}
}