From: Kai Moritz Date: Fri, 2 Jun 2023 12:42:18 +0000 (+0200) Subject: Kafka-Spezifika in separater Methode von Spring-WebMVC getrennt X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=refs%2Fheads%2Fsumup-gateway;p=demos%2Fkafka%2Ftraining Kafka-Spezifika in separater Methode von Spring-WebMVC getrennt --- diff --git a/src/main/java/de/juplo/kafka/RestGateway.java b/src/main/java/de/juplo/kafka/RestGateway.java index c50f465..8daf950 100644 --- a/src/main/java/de/juplo/kafka/RestGateway.java +++ b/src/main/java/de/juplo/kafka/RestGateway.java @@ -8,6 +8,8 @@ import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.*; import org.springframework.web.context.request.async.DeferredResult; +import java.util.function.Consumer; + @Slf4j @RequestMapping @@ -29,7 +31,15 @@ public class RestGateway @RequestBody Integer value) { DeferredResult result = new DeferredResult<>(); + send(key, value, produceRsult -> result.setResult(produceRsult)); + return result; + } + void send( + String key, + Integer value, + Consumer produceResultConsumer) + { final long time = System.currentTimeMillis(); final ProducerRecord record = new ProducerRecord<>( @@ -46,7 +56,7 @@ public class RestGateway { // HANDLE SUCCESS produced++; - result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset())); + produceResultConsumer.accept(new ProduceSuccess(metadata.partition(), metadata.offset())); log.debug( "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", id, @@ -61,7 +71,7 @@ public class RestGateway else { // HANDLE ERROR - result.setErrorResult(new ProduceFailure(e)); + produceResultConsumer.accept(new ProduceFailure(e)); log.error( "{} - ERROR key={} timestamp={} latency={}ms: {}", id, @@ -80,8 +90,6 @@ public class RestGateway record.key(), now - time ); - - return result; } @ExceptionHandler