Introduced different Events for the creation and the state-changes
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / adapter / KafkaMessagingService.java
index 3161af3..6da7937 100644 (file)
@@ -24,26 +24,38 @@ public class KafkaMessagingService implements MessagingService
 
   @Override
   public CompletableFuture<?> send(Transfer transfer)
+  {
+    return send(transfer.getId(), EventType.NEW_TRANSFER, NewTransferEvent.ofTransfer(transfer));
+  }
+
+  public CompletableFuture<?> send(Long id, Transfer.State state)
+  {
+    return send(id, EventType.TRANSFER_STATE_CHANGED, new TransferStateChangedEvent(id, state));
+  }
+
+  private CompletableFuture send(Long id, byte eventType, Object payload)
   {
     try
     {
       CompletableFuture<TopicPartition> future = new CompletableFuture<>();
+
       ProducerRecord<String, String> record =
           new ProducerRecord<>(
               topic,
-              Long.toString(transfer.getId()),
-              mapper.writeValueAsString(transfer));
+              Long.toString(id),
+              mapper.writeValueAsString(payload));
+      record.headers().add(EventType.HEADER, new byte[] { eventType });
 
       producer.send(record, (metadata, exception) ->
       {
         if (metadata != null)
         {
-          log.debug("Sent {} to {}/{}:{}", transfer, metadata.topic(), metadata.partition(), metadata.offset());
+          log.debug("Sent {} to {}/{}:{}", payload, metadata.topic(), metadata.partition(), metadata.offset());
           future.complete(new TopicPartition(metadata.topic(), metadata.partition()));
         }
         else
         {
-          log.error("Could not send {}: {}", transfer, exception.getMessage());
+          log.error("Could not send {}: {}", payload, exception.getMessage());
           future.completeExceptionally(exception);
         }
       });
@@ -52,7 +64,7 @@ public class KafkaMessagingService implements MessagingService
     }
     catch (JsonProcessingException e)
     {
-      throw new RuntimeException("Could not convert " + transfer, e);
+      throw new RuntimeException("Could not convert " + payload, e);
     }
   }