X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2FTransferServiceApplication.java;h=53ff0f4b6673261e7a4b6050f740aa127a483e2a;hb=cbfe4b796266ff7b9689fb69c5a8efee8ebb130a;hp=259b62dc89fb9167f6eddc57877f1402bd61549d;hpb=edc88d6eac8c502ab0297380489ccc9ba706b5f0;p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer diff --git a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java index 259b62d..53ff0f4 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java @@ -7,8 +7,8 @@ import de.juplo.kafka.payment.transfer.adapter.NoOpMessageService; import de.juplo.kafka.payment.transfer.adapter.TransferConsumer; import de.juplo.kafka.payment.transfer.adapter.TransferController; import de.juplo.kafka.payment.transfer.domain.Transfer; -import de.juplo.kafka.payment.transfer.domain.TransferService; import de.juplo.kafka.payment.transfer.ports.TransferRepository; +import de.juplo.kafka.payment.transfer.ports.TransferService; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -20,11 +20,10 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; +import org.springframework.util.Assert; import java.util.Optional; import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; @SpringBootApplication @@ -35,8 +34,11 @@ public class TransferServiceApplication @Bean(destroyMethod = "close") KafkaProducer producer(TransferServiceProperties properties) { + Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set"); + Assert.hasText(properties.getTopic(), "juplo.transfer.topic must be set"); + Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); @@ -46,9 +48,12 @@ public class TransferServiceApplication @Bean KafkaConsumer consumer(TransferServiceProperties properties) { + Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set"); + Assert.hasText(properties.getGroupId(), "juplo.transfer.group-id must be set"); + Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers); - props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.groupId); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); @@ -57,32 +62,24 @@ public class TransferServiceApplication return new KafkaConsumer<>(props); } - @Bean(destroyMethod = "shutdown") - ExecutorService executorService() - { - return Executors.newFixedThreadPool(1); - } - @Bean(destroyMethod = "shutdown") TransferConsumer transferConsumer( TransferServiceProperties properties, KafkaConsumer consumer, - ExecutorService executorService, ObjectMapper mapper, TransferService productionTransferService, TransferService restoreTransferService) { return new TransferConsumer( - properties.topic, + properties.getTopic(), consumer, - executorService, mapper, new TransferConsumer.ConsumerUseCases() { @Override - public void create(Transfer transfer) + public void create(Long id, Long payer, Long payee, Integer amount) { - productionTransferService.create(transfer); + productionTransferService.create(id, payer, payee, amount); } @Override @@ -92,16 +89,16 @@ public class TransferServiceApplication } @Override - public void handle(Transfer transfer) + public void handleStateChange(Long id, Transfer.State state) { - productionTransferService.handle(transfer); + productionTransferService.handleStateChange(id, state); } }, new TransferConsumer.ConsumerUseCases() { @Override - public void create(Transfer transfer) + public void create(Long id, Long payer, Long payee, Integer amount) { - restoreTransferService.create(transfer); + restoreTransferService.create(id, payer, payee, amount); } @Override @@ -111,9 +108,9 @@ public class TransferServiceApplication } @Override - public void handle(Transfer transfer) + public void handleStateChange(Long id, Transfer.State state) { - restoreTransferService.handle(transfer); + restoreTransferService.handleStateChange(id, state); } }); } @@ -124,7 +121,7 @@ public class TransferServiceApplication ObjectMapper mapper, TransferServiceProperties properties) { - return new KafkaMessagingService(producer, mapper, properties.topic); + return new KafkaMessagingService(producer, mapper, properties.getTopic()); } @Bean