- Transfer transfer = mapper.readValue(record.value(), Transfer.class);
- handleTransferUseCase.handle(transfer);
+ byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
+
+ switch (eventType)
+ {
+ case EventType.NEW_TRANSFER:
+
+ NewTransferEvent newTransferEvent =
+ mapper.readValue(record.value(), NewTransferEvent.class);
+ createTransferUseCase.create(newTransferEvent.toTransfer().setState(CREATED));
+ break;
+
+ case EventType.TRANSFER_STATE_CHANGED:
+
+ TransferStateChangedEvent stateChangedEvent =
+ mapper.readValue(record.value(), TransferStateChangedEvent.class);
+ getTransferUseCase
+ .get(stateChangedEvent.getId())
+ .ifPresentOrElse(
+ transfer -> handleStateChangeUseCase.handle(transfer.setState(stateChangedEvent.getState())),
+ () -> log.error("unknown transfer: {}", stateChangedEvent.getId()));
+ break;
+ }