import de.juplo.kafka.payment.transfer.adapter.TransferConsumer;
import de.juplo.kafka.payment.transfer.adapter.TransferController;
import de.juplo.kafka.payment.transfer.domain.Transfer;
-import de.juplo.kafka.payment.transfer.domain.TransferService;
import de.juplo.kafka.payment.transfer.ports.TransferRepository;
+import de.juplo.kafka.payment.transfer.ports.TransferService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Optional;
import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
@SpringBootApplication
return new KafkaConsumer<>(props);
}
- @Bean(destroyMethod = "shutdown")
- ExecutorService executorService()
- {
- return Executors.newFixedThreadPool(1);
- }
-
@Bean(destroyMethod = "shutdown")
TransferConsumer transferConsumer(
TransferServiceProperties properties,
KafkaConsumer<String, String> consumer,
- ExecutorService executorService,
ObjectMapper mapper,
TransferService productionTransferService,
TransferService restoreTransferService)
new TransferConsumer(
properties.topic,
consumer,
- executorService,
mapper,
new TransferConsumer.ConsumerUseCases() {
@Override
- public void create(Transfer transfer)
+ public void create(Long id, Long payer, Long payee, Integer amount)
{
- productionTransferService.create(transfer);
+ productionTransferService.create(id, payer, payee, amount);
}
@Override
}
@Override
- public void handle(Transfer transfer)
+ public void handleStateChange(Long id, Transfer.State state)
{
- productionTransferService.handle(transfer);
+ productionTransferService.handleStateChange(id, state);
}
},
new TransferConsumer.ConsumerUseCases() {
@Override
- public void create(Transfer transfer)
+ public void create(Long id, Long payer, Long payee, Integer amount)
{
- restoreTransferService.create(transfer);
+ restoreTransferService.create(id, payer, payee, amount);
}
@Override
}
@Override
- public void handle(Transfer transfer)
+ public void handleStateChange(Long id, Transfer.State state)
{
- restoreTransferService.handle(transfer);
+ restoreTransferService.handleStateChange(id, state);
}
});
}