From 865232914b174ba2e5996be699d05f3d1cca8d15 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 2 Jun 2023 14:42:18 +0200 Subject: [PATCH] Kafka-Spezifika in separater Methode von Spring-WebMVC getrennt --- src/main/java/de/juplo/kafka/RestGateway.java | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/main/java/de/juplo/kafka/RestGateway.java b/src/main/java/de/juplo/kafka/RestGateway.java index c50f465..8daf950 100644 --- a/src/main/java/de/juplo/kafka/RestGateway.java +++ b/src/main/java/de/juplo/kafka/RestGateway.java @@ -8,6 +8,8 @@ import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.*; import org.springframework.web.context.request.async.DeferredResult; +import java.util.function.Consumer; + @Slf4j @RequestMapping @@ -29,7 +31,15 @@ public class RestGateway @RequestBody Integer value) { DeferredResult result = new DeferredResult<>(); + send(key, value, produceRsult -> result.setResult(produceRsult)); + return result; + } + void send( + String key, + Integer value, + Consumer produceResultConsumer) + { final long time = System.currentTimeMillis(); final ProducerRecord record = new ProducerRecord<>( @@ -46,7 +56,7 @@ public class RestGateway { // HANDLE SUCCESS produced++; - result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset())); + produceResultConsumer.accept(new ProduceSuccess(metadata.partition(), metadata.offset())); log.debug( "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", id, @@ -61,7 +71,7 @@ public class RestGateway else { // HANDLE ERROR - result.setErrorResult(new ProduceFailure(e)); + produceResultConsumer.accept(new ProduceFailure(e)); log.error( "{} - ERROR key={} timestamp={} latency={}ms: {}", id, @@ -80,8 +90,6 @@ public class RestGateway record.key(), now - time ); - - return result; } @ExceptionHandler -- 2.20.1