void sendCounterState(int partition, String key, Long counter)
{
seen[partition]++;
- ProducerRecord<String, String> record = new ProducerRecord<>(stateTopic, key, counter.toString());
- producer.send(record, ((metadata, exception) ->
+
+ final long time = System.currentTimeMillis();
+
+ final ProducerRecord<String, String> record = new ProducerRecord<>(
+ stateTopic, // Topic
+ key, // Key
+ counter.toString() // Value
+ );
+
+ producer.send(record, (metadata, e) ->
{
- if (exception == null)
+ long now = System.currentTimeMillis();
+ if (e == null)
{
- acked[partition]++;
- if (done[partition] && !(acked[partition] < seen[partition]))
- {
- phaser.arrive();
- }
+ // HANDLE SUCCESS
+ log.debug(
+ "{} - Sent message {}={}, partition={}:{}, timestamp={}, latency={}ms",
+ id,
+ record.key(),
+ record.value(),
+ metadata.partition(),
+ metadata.offset(),
+ metadata.timestamp(),
+ now - time
+ );
}
else
{
- // Errors are ignored (for now):
- // The next occurrence of the key will issue a new update of the counter state
- log.error("{} - {}", id, exception.toString());
+ // HANDLE ERROR
+ log.error(
+ "{} - ERROR for message {}={}, timestamp={}, latency={}ms: {}",
+ id,
+ record.key(),
+ record.value(),
+ metadata == null ? -1 : metadata.timestamp(),
+ now - time,
+ e.toString()
+ );
+ }
+
+ acked[partition]++;
+ if (done[partition] && !(acked[partition] < seen[partition]))
+ {
+ phaser.arrive();
}
- }));
+ });
+
+ long now = System.currentTimeMillis();
+ log.trace(
+ "{} - Queued message {}={}, latency={}ms",
+ id,
+ record.key(),
+ record.value(),
+ now - time
+ );
}
@Override