WIP
[demos/kafka/demos-kafka-payment-system-transfer] / transfer / src / main / java / de / juplo / kafka / payment / transfer / domain / TransferService.java
index 34ef29c..d708826 100644 (file)
@@ -1,15 +1,16 @@
 package de.juplo.kafka.payment.transfer.domain;
 
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.springframework.http.ResponseEntity;
 
-import java.net.URI;
-import java.util.UUID;
+import java.util.Optional;
+
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*;
 
 
 @Slf4j
@@ -17,49 +18,72 @@ import java.util.UUID;
 public class TransferService
 {
   private final TransferRepository repository;
-  private final KafkaProducer<UUID, String> producer;
+  private final KafkaProducer<String, String> producer;
   private final ObjectMapper mapper;
   private final String topic;
 
-  public void initiate(Transfer transfer)
+  public synchronized void initiate(Transfer transfer)
   {
     repository
         .get(transfer.getId())
         .ifPresentOrElse(
             stored ->
             {
+              if (!transfer.equals(stored))
+                throw new IllegalArgumentException(
+                    "Re-Initiation of transfer with different data: old=" +
+                        stored +
+                        ", new=" +
+                        transfer);
 
+              if (stored.getState() == FAILED)
+              {
+                repository.update(transfer.getId(), FAILED, SENT);
+                log.info("Resending faild transfer: " + stored);
+                send(transfer);
+              }
             },
             () ->
             {
-
+              send(transfer);
+              transfer.setState(SENT);
+              repository.store(transfer);
             });
+  }
+
+
+  private void send(Transfer transfer)
+  {
     try
     {
-      ProducerRecord<UUID, String> record =
+      ProducerRecord<String, String> record =
           new ProducerRecord<>(
               topic,
-              transfer.getId(),
+              Long.toString(transfer.getId()),
               mapper.writeValueAsString(transfer));
 
       producer.send(record, (metadata, exception) ->
       {
-        if (exception != null)
+        if (metadata != null)
         {
-          log.error("Could not place order {}: {}", transfer, exception.toString());
-          result.setErrorResult(exception);
-          return;
+          log.debug("Sent {} to {}/{}:{}", transfer, metadata.topic(), metadata.partition(), metadata.offset());
+          repository.update(transfer.getId(), SENT, PENDING);
+        }
+        else
+        {
+          log.error("Could not send {}: {}", transfer, exception.getMessage());
+          repository.update(transfer.getId(), SENT, FAILED);
         }
-
-        result.setResult(ResponseEntity.created(URI.create(path + transfer.getId())).build());
       });
     }
-    catch (Exception e)
+    catch (JsonProcessingException e)
     {
-      log.error("Unexpected exception!", e);
-      result.setErrorResult(e);
+      throw new RuntimeException("Could not convert " + transfer, e);
     }
+  }
 
-    return result;
+  public Optional<Transfer> get(Long id)
+  {
+    return repository.get(id);
   }
 }