currentOffset);
consumer.seek(topicPartition, nextOffset[partition]);
+ infoChannel.sendShardAssignedEvent(partition);
});
consumer.resume(partitions);
int partition = topicPartition.partition();
isShardOwned[partition] = false;
log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
+ infoChannel.sendShardRevokedEvent(partition);
});
}