import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
+import java.util.function.Consumer;
+
@Slf4j
@RequestMapping
@RequestBody Integer value)
{
DeferredResult<ProduceResult> result = new DeferredResult<>();
+ send(key, value, produceRsult -> result.setResult(produceRsult));
+ return result;
+ }
+ void send(
+ String key,
+ Integer value,
+ Consumer<ProduceResult> produceResultConsumer)
+ {
final long time = System.currentTimeMillis();
final ProducerRecord<String, Integer> record = new ProducerRecord<>(
{
// HANDLE SUCCESS
produced++;
- result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
+ produceResultConsumer.accept(new ProduceSuccess(metadata.partition(), metadata.offset()));
log.debug(
"{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
id,
else
{
// HANDLE ERROR
- result.setErrorResult(new ProduceFailure(e));
+ produceResultConsumer.accept(new ProduceFailure(e));
log.error(
"{} - ERROR key={} timestamp={} latency={}ms: {}",
id,
record.key(),
now - time
);
-
- return result;
}
@ExceptionHandler