WIP
authorKai Moritz <kai@juplo.de>
Sat, 12 Jun 2021 08:47:29 +0000 (10:47 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 12 Jun 2021 08:47:29 +0000 (10:47 +0200)
transfer/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java
transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java
transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferRepository.java
transfer/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java

index cc3e056..e439068 100644 (file)
@@ -8,7 +8,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.serialization.UUIDSerializer;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -24,11 +23,11 @@ import java.util.UUID;
 public class TransferServiceApplication
 {
   @Bean(destroyMethod = "close")
-  KafkaProducer<UUID, String> producer(TransferServiceProperties properties)
+  KafkaProducer<String, String> producer(TransferServiceProperties properties)
   {
     Properties props = new Properties();
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
-    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, UUIDSerializer.class);
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 
     return new KafkaProducer<>(props);
index 2aa87fe..878a233 100644 (file)
@@ -2,16 +2,18 @@ package de.juplo.kafka.payment.transfer.domain;
 
 
 import lombok.Builder;
-import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Value;
 
 
-@Data
+@Value
 @Builder
+@EqualsAndHashCode(exclude = "state")
 public class Transfer
 {
   public enum State
   {
-    CREATED,
+    SENT,
     FAILED,
     PENDING,
     APPROVED,
index 908caa1..36d027c 100644 (file)
@@ -1,13 +1,15 @@
 package de.juplo.kafka.payment.transfer.domain;
 
-import de.juplo.kafka.payment.transfer.domain.Transfer;
-
 import java.util.Optional;
-import java.util.UUID;
 
 
 public interface TransferRepository
 {
-  void put(Transfer transfer);
+  void store(Transfer transfer);
+
   Optional<Transfer> get(Long id);
+
+  void update(Long id, Transfer.State oldState, Transfer.State newState) throws IllegalArgumentException;
+
+  void remove(Long id);
 }
index 6545e29..f5d41cb 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka.payment.transfer.domain;
 
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -11,13 +12,15 @@ import org.springframework.http.ResponseEntity;
 import java.net.URI;
 import java.util.UUID;
 
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*;
+
 
 @Slf4j
 @RequiredArgsConstructor
 public class TransferService
 {
   private final TransferRepository repository;
-  private final KafkaProducer<UUID, String> producer;
+  private final KafkaProducer<String, String> producer;
   private final ObjectMapper mapper;
   private final String topic;
 
@@ -28,45 +31,55 @@ public class TransferService
         .ifPresentOrElse(
             stored ->
             {
-              switch (stored.getState())
+              if (!transfer.equals(stored))
+                throw new IllegalArgumentException(
+                    "Re-Initiation of transfer with different data: old=" +
+                        stored +
+                        ", new=" +
+                        transfer);
+
+              if (stored.getState() == FAILED)
               {
-                case FAILED:
+                repository.update(transfer.getId(), FAILED, SENT);
+                log.info("Resending faild transfer: " + stored);
+                send(transfer);
               }
             },
             () ->
             {
+              repository.store(transfer);
+              send(transfer);
             });
   }
 
 
-  private void create(Transfer transfer)
+  private void send(Transfer transfer)
   {
     try
     {
-      ProducerRecord<UUID, String> record =
+      ProducerRecord<String, String> record =
           new ProducerRecord<>(
               topic,
-              transfer.getId(),
+              Long.toString(transfer.getId()),
               mapper.writeValueAsString(transfer));
 
       producer.send(record, (metadata, exception) ->
       {
-        if (exception != null)
+        if (metadata != null)
         {
-          log.error("Could not place order {}: {}", transfer, exception.toString());
-          result.setErrorResult(exception);
-          return;
+          log.debug("Sent {} to {}/{}:{}", transfer, metadata.topic(), metadata.partition(), metadata.offset());
+          repository.update(transfer.getId(), SENT, PENDING);
+        }
+        else
+        {
+          log.error("Could not send {}: {}", transfer, exception.getMessage());
+          repository.update(transfer.getId(), SENT, FAILED);
         }
-
-        result.setResult(ResponseEntity.created(URI.create(path + transfer.getId())).build());
       });
     }
-    catch (Exception e)
+    catch (JsonProcessingException e)
     {
-      log.error("Unexpected exception!", e);
-      result.setErrorResult(e);
+      throw new RuntimeException("Could not convert " + transfer, e);
     }
-
-    return result;
   }
 }