import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.serialization.UUIDSerializer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
public class TransferServiceApplication
{
@Bean(destroyMethod = "close")
- KafkaProducer<UUID, String> producer(TransferServiceProperties properties)
+ KafkaProducer<String, String> producer(TransferServiceProperties properties)
{
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, UUIDSerializer.class);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new KafkaProducer<>(props);
package de.juplo.kafka.payment.transfer.domain;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.net.URI;
import java.util.UUID;
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*;
+
@Slf4j
@RequiredArgsConstructor
public class TransferService
{
private final TransferRepository repository;
- private final KafkaProducer<UUID, String> producer;
+ private final KafkaProducer<String, String> producer;
private final ObjectMapper mapper;
private final String topic;
.ifPresentOrElse(
stored ->
{
- switch (stored.getState())
+ if (!transfer.equals(stored))
+ throw new IllegalArgumentException(
+ "Re-Initiation of transfer with different data: old=" +
+ stored +
+ ", new=" +
+ transfer);
+
+ if (stored.getState() == FAILED)
{
- case FAILED:
+ repository.update(transfer.getId(), FAILED, SENT);
+ log.info("Resending faild transfer: " + stored);
+ send(transfer);
}
},
() ->
{
+ repository.store(transfer);
+ send(transfer);
});
}
- private void create(Transfer transfer)
+ private void send(Transfer transfer)
{
try
{
- ProducerRecord<UUID, String> record =
+ ProducerRecord<String, String> record =
new ProducerRecord<>(
topic,
- transfer.getId(),
+ Long.toString(transfer.getId()),
mapper.writeValueAsString(transfer));
producer.send(record, (metadata, exception) ->
{
- if (exception != null)
+ if (metadata != null)
{
- log.error("Could not place order {}: {}", transfer, exception.toString());
- result.setErrorResult(exception);
- return;
+ log.debug("Sent {} to {}/{}:{}", transfer, metadata.topic(), metadata.partition(), metadata.offset());
+ repository.update(transfer.getId(), SENT, PENDING);
+ }
+ else
+ {
+ log.error("Could not send {}: {}", transfer, exception.getMessage());
+ repository.update(transfer.getId(), SENT, FAILED);
}
-
- result.setResult(ResponseEntity.created(URI.create(path + transfer.getId())).build());
});
}
- catch (Exception e)
+ catch (JsonProcessingException e)
{
- log.error("Unexpected exception!", e);
- result.setErrorResult(e);
+ throw new RuntimeException("Could not convert " + transfer, e);
}
-
- return result;
}
}