X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2FTransferServiceApplication.java;h=eab6abf78046f073bb771adecea1f49133ebe5b5;hp=02842e51c97a5cad302145df63f72480b3691a6d;hb=90762b3c47fc63734707e52acfaeb8e427089f41;hpb=4467c5240397a47b181106a0ae902ed1b71d0c5d diff --git a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java index 02842e5..eab6abf 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java @@ -3,10 +3,12 @@ package de.juplo.kafka.payment.transfer; import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.payment.transfer.adapter.KafkaMessagingService; +import de.juplo.kafka.payment.transfer.adapter.NoOpMessageService; import de.juplo.kafka.payment.transfer.adapter.TransferConsumer; -import de.juplo.kafka.payment.transfer.domain.TransferService; -import de.juplo.kafka.payment.transfer.ports.MessagingService; +import de.juplo.kafka.payment.transfer.adapter.TransferController; +import de.juplo.kafka.payment.transfer.domain.Transfer; import de.juplo.kafka.payment.transfer.ports.TransferRepository; +import de.juplo.kafka.payment.transfer.ports.TransferService; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -19,9 +21,8 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; +import java.util.Optional; import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; @SpringBootApplication @@ -46,34 +47,69 @@ public class TransferServiceApplication Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.groupId); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new KafkaConsumer<>(props); } - @Bean(destroyMethod = "shutdown") - ExecutorService executorService() - { - return Executors.newFixedThreadPool(1); - } - @Bean(destroyMethod = "shutdown") TransferConsumer transferConsumer( TransferServiceProperties properties, KafkaConsumer consumer, - ExecutorService executorService, ObjectMapper mapper, - TransferService transferService) + TransferService productionTransferService, + TransferService restoreTransferService) { - TransferConsumer transferConsumer = - new TransferConsumer(properties.topic, consumer, executorService, mapper, transferService); - transferConsumer.start(); - return transferConsumer; + return + new TransferConsumer( + properties.topic, + consumer, + mapper, + new TransferConsumer.ConsumerUseCases() { + @Override + public void create(Long id, Long payer, Long payee, Integer amount) + { + productionTransferService.create(id, payer, payee, amount); + } + + @Override + public Optional get(Long id) + { + return productionTransferService.get(id); + } + + @Override + public void handleStateChange(Long id, Transfer.State state) + { + productionTransferService.handleStateChange(id, state); + } + }, + new TransferConsumer.ConsumerUseCases() { + @Override + public void create(Long id, Long payer, Long payee, Integer amount) + { + restoreTransferService.create(id, payer, payee, amount); + } + + @Override + public Optional get(Long id) + { + return restoreTransferService.get(id); + } + + @Override + public void handleStateChange(Long id, Transfer.State state) + { + restoreTransferService.handleStateChange(id, state); + } + }); } @Bean - MessagingService kafkaMessagingService( + KafkaMessagingService kafkaMessagingService( KafkaProducer producer, ObjectMapper mapper, TransferServiceProperties properties) @@ -82,11 +118,27 @@ public class TransferServiceApplication } @Bean - TransferService transferService( + TransferService productionTransferService( TransferRepository repository, - MessagingService messagingService) + KafkaMessagingService kafkaMessagingService) + { + return new TransferService(repository, kafkaMessagingService); + } + + @Bean + TransferService restoreTransferService( + TransferRepository repository, + NoOpMessageService noOpMessageService) + { + return new TransferService(repository, noOpMessageService); + } + + @Bean + TransferController transferController( + TransferService productionTransferService, + KafkaMessagingService kafkaMessagingService) { - return new TransferService(repository, messagingService); + return new TransferController(productionTransferService, kafkaMessagingService); }