import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.payment.transfer.domain.Transfer;
-import de.juplo.kafka.payment.transfer.ports.HandleTransferUseCase;
+import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
+import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
+import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED;
+
@RequestMapping("/consumer")
@ResponseBody
private final KafkaConsumer<String, String> consumer;
private final ExecutorService executorService;
private final ObjectMapper mapper;
- private final HandleTransferUseCase handleTransferUseCase;
+ private final GetTransferUseCase getTransferUseCase;
+ private final CreateTransferUseCase createTransferUseCase;
+ private final HandleStateChangeUseCase handleStateChangeUseCase;
private boolean running = false;
private Future<?> future = null;
try
{
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
- log.debug("polled {} records", records.count());
+ if (records.count() > 0)
+ log.debug("polled {} records", records.count());
records.forEach(record ->
{
try
{
- Transfer transfer = mapper.readValue(record.value(), Transfer.class);
- handleTransferUseCase.handle(transfer);
+ byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
+
+ switch (eventType)
+ {
+ case EventType.NEW_TRANSFER:
+
+ NewTransferEvent newTransferEvent =
+ mapper.readValue(record.value(), NewTransferEvent.class);
+ createTransferUseCase.create(newTransferEvent.toTransfer().setState(CREATED));
+ break;
+
+ case EventType.TRANSFER_STATE_CHANGED:
+
+ TransferStateChangedEvent stateChangedEvent =
+ mapper.readValue(record.value(), TransferStateChangedEvent.class);
+ getTransferUseCase
+ .get(stateChangedEvent.getId())
+ .ifPresentOrElse(
+ transfer -> handleStateChangeUseCase.handle(transfer.setState(stateChangedEvent.getState())),
+ () -> log.error("unknown transfer: {}", stateChangedEvent.getId()));
+ break;
+ }
}
catch (JsonProcessingException e)
{