import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.ZoneOffset;
import java.util.LinkedHashMap;
import java.util.UUID;
-import java.util.concurrent.Future;
@Slf4j
{
if (metadata != null)
{
+ // On successful send
Message message = messages.get(key);
if (message != null)
{
sink.success();
}
+ else
+ {
+ // On send-failure
+ sink.error(exception);
+ }
}));
});
}