import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
@Slf4j
}
}
- void send(String key, String value)
+ CompletableFuture<Long> send(String key, String value)
{
+ final CompletableFuture<Long> result = new CompletableFuture<>();
final long time = System.currentTimeMillis();
final ProducerRecord<String, String> record = new ProducerRecord<>(
metadata.timestamp(),
now - time
);
+ result.complete(metadata.offset());
}
else
{
now - time,
e.toString()
);
+ result.completeExceptionally(e);
}
});
record.key(),
now - time
);
+
+ return result;
}