X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fadapter%2FKafkaMessagingService.java;h=6da7937701228d203c04e34f48a80e3ba41c7e22;hp=3161af32ab0d932715d4b8ee374c3fcbced4f7dc;hb=26809d379a0e024017f70db8c70382f94faf98b6;hpb=78721edb87827a1837f298f24761af8be640c58b diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/KafkaMessagingService.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/KafkaMessagingService.java index 3161af3..6da7937 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/KafkaMessagingService.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/KafkaMessagingService.java @@ -24,26 +24,38 @@ public class KafkaMessagingService implements MessagingService @Override public CompletableFuture send(Transfer transfer) + { + return send(transfer.getId(), EventType.NEW_TRANSFER, NewTransferEvent.ofTransfer(transfer)); + } + + public CompletableFuture send(Long id, Transfer.State state) + { + return send(id, EventType.TRANSFER_STATE_CHANGED, new TransferStateChangedEvent(id, state)); + } + + private CompletableFuture send(Long id, byte eventType, Object payload) { try { CompletableFuture future = new CompletableFuture<>(); + ProducerRecord record = new ProducerRecord<>( topic, - Long.toString(transfer.getId()), - mapper.writeValueAsString(transfer)); + Long.toString(id), + mapper.writeValueAsString(payload)); + record.headers().add(EventType.HEADER, new byte[] { eventType }); producer.send(record, (metadata, exception) -> { if (metadata != null) { - log.debug("Sent {} to {}/{}:{}", transfer, metadata.topic(), metadata.partition(), metadata.offset()); + log.debug("Sent {} to {}/{}:{}", payload, metadata.topic(), metadata.partition(), metadata.offset()); future.complete(new TopicPartition(metadata.topic(), metadata.partition())); } else { - log.error("Could not send {}: {}", transfer, exception.getMessage()); + log.error("Could not send {}: {}", payload, exception.getMessage()); future.completeExceptionally(exception); } }); @@ -52,7 +64,7 @@ public class KafkaMessagingService implements MessagingService } catch (JsonProcessingException e) { - throw new RuntimeException("Could not convert " + transfer, e); + throw new RuntimeException("Could not convert " + payload, e); } }