producer.send(record, ((metadata, exception) ->
{
- if (metadata != null)
+ if (exception == null)
{
// On successful send
Message message = new Message(key, metadata.offset(), timestamp, text);
currentOffset);
consumer.seek(topicPartition, nextOffset[partition]);
+ infoChannel.sendShardAssignedEvent(partition);
});
consumer.resume(partitions);
int partition = topicPartition.partition();
isShardOwned[partition] = false;
log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
+ infoChannel.sendShardRevokedEvent(partition);
});
}