WIP
[demos/kafka/demos-kafka-payment-system-transfer] / transfer / src / main / java / de / juplo / kafka / payment / transfer / domain / TransferService.java
index 6545e29..f5d41cb 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka.payment.transfer.domain;
 
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -11,13 +12,15 @@ import org.springframework.http.ResponseEntity;
 import java.net.URI;
 import java.util.UUID;
 
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*;
+
 
 @Slf4j
 @RequiredArgsConstructor
 public class TransferService
 {
   private final TransferRepository repository;
-  private final KafkaProducer<UUID, String> producer;
+  private final KafkaProducer<String, String> producer;
   private final ObjectMapper mapper;
   private final String topic;
 
@@ -28,45 +31,55 @@ public class TransferService
         .ifPresentOrElse(
             stored ->
             {
-              switch (stored.getState())
+              if (!transfer.equals(stored))
+                throw new IllegalArgumentException(
+                    "Re-Initiation of transfer with different data: old=" +
+                        stored +
+                        ", new=" +
+                        transfer);
+
+              if (stored.getState() == FAILED)
               {
-                case FAILED:
+                repository.update(transfer.getId(), FAILED, SENT);
+                log.info("Resending faild transfer: " + stored);
+                send(transfer);
               }
             },
             () ->
             {
+              repository.store(transfer);
+              send(transfer);
             });
   }
 
 
-  private void create(Transfer transfer)
+  private void send(Transfer transfer)
   {
     try
     {
-      ProducerRecord<UUID, String> record =
+      ProducerRecord<String, String> record =
           new ProducerRecord<>(
               topic,
-              transfer.getId(),
+              Long.toString(transfer.getId()),
               mapper.writeValueAsString(transfer));
 
       producer.send(record, (metadata, exception) ->
       {
-        if (exception != null)
+        if (metadata != null)
         {
-          log.error("Could not place order {}: {}", transfer, exception.toString());
-          result.setErrorResult(exception);
-          return;
+          log.debug("Sent {} to {}/{}:{}", transfer, metadata.topic(), metadata.partition(), metadata.offset());
+          repository.update(transfer.getId(), SENT, PENDING);
+        }
+        else
+        {
+          log.error("Could not send {}: {}", transfer, exception.getMessage());
+          repository.update(transfer.getId(), SENT, FAILED);
         }
-
-        result.setResult(ResponseEntity.created(URI.create(path + transfer.getId())).build());
       });
     }
-    catch (Exception e)
+    catch (JsonProcessingException e)
     {
-      log.error("Unexpected exception!", e);
-      result.setErrorResult(e);
+      throw new RuntimeException("Could not convert " + transfer, e);
     }
-
-    return result;
   }
 }