Kafka-Spezifika in separater Methode von Spring-WebMVC getrennt sumup-gateway
authorKai Moritz <kai@juplo.de>
Fri, 2 Jun 2023 12:42:18 +0000 (14:42 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 2 Jun 2023 12:42:18 +0000 (14:42 +0200)
src/main/java/de/juplo/kafka/RestGateway.java

index c50f465..8daf950 100644 (file)
@@ -8,6 +8,8 @@ import org.springframework.http.HttpStatus;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.context.request.async.DeferredResult;
 
+import java.util.function.Consumer;
+
 
 @Slf4j
 @RequestMapping
@@ -29,7 +31,15 @@ public class RestGateway
       @RequestBody Integer value)
   {
     DeferredResult<ProduceResult> result = new DeferredResult<>();
+    send(key, value, produceRsult -> result.setResult(produceRsult));
+    return result;
+  }
 
+  void send(
+      String key,
+      Integer value,
+      Consumer<ProduceResult> produceResultConsumer)
+  {
     final long time = System.currentTimeMillis();
 
     final ProducerRecord<String, Integer> record = new ProducerRecord<>(
@@ -46,7 +56,7 @@ public class RestGateway
       {
         // HANDLE SUCCESS
         produced++;
-        result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
+        produceResultConsumer.accept(new ProduceSuccess(metadata.partition(), metadata.offset()));
         log.debug(
             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
             id,
@@ -61,7 +71,7 @@ public class RestGateway
       else
       {
         // HANDLE ERROR
-        result.setErrorResult(new ProduceFailure(e));
+        produceResultConsumer.accept(new ProduceFailure(e));
         log.error(
             "{} - ERROR key={} timestamp={} latency={}ms: {}",
             id,
@@ -80,8 +90,6 @@ public class RestGateway
         record.key(),
         now - time
     );
-
-    return result;
   }
 
   @ExceptionHandler