@Override
public CompletableFuture<?> send(Transfer transfer)
+ {
+ return send(transfer.getId(), EventType.NEW_TRANSFER, NewTransferEvent.ofTransfer(transfer));
+ }
+
+ public CompletableFuture<?> send(Long id, Transfer.State state)
+ {
+ return send(id, EventType.TRANSFER_STATE_CHANGED, new TransferStateChangedEvent(id, state));
+ }
+
+ private CompletableFuture send(Long id, byte eventType, Object payload)
{
try
{
CompletableFuture<TopicPartition> future = new CompletableFuture<>();
+
ProducerRecord<String, String> record =
new ProducerRecord<>(
topic,
- Long.toString(transfer.getId()),
- mapper.writeValueAsString(transfer));
+ Long.toString(id),
+ mapper.writeValueAsString(payload));
+ record.headers().add(EventType.HEADER, new byte[] { eventType });
producer.send(record, (metadata, exception) ->
{
if (metadata != null)
{
- log.debug("Sent {} to {}/{}:{}", transfer, metadata.topic(), metadata.partition(), metadata.offset());
+ log.debug("Sent {} to {}/{}:{}", payload, metadata.topic(), metadata.partition(), metadata.offset());
future.complete(new TopicPartition(metadata.topic(), metadata.partition()));
}
else
{
- log.error("Could not send {}: {}", transfer, exception.getMessage());
+ log.error("Could not send {}: {}", payload, exception.getMessage());
future.completeExceptionally(exception);
}
});
}
catch (JsonProcessingException e)
{
- throw new RuntimeException("Could not convert " + transfer, e);
+ throw new RuntimeException("Could not convert " + payload, e);
}
}