X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2FTransferServiceApplication.java;h=e94f703d34a64de2dbe65b6c86ff148c84620b5d;hp=58a3af2d0c2bf7cb1e4e430dee9603f22d0d7bac;hb=HEAD;hpb=26809d379a0e024017f70db8c70382f94faf98b6 diff --git a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java index 58a3af2..e94f703 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java @@ -2,13 +2,17 @@ package de.juplo.kafka.payment.transfer; import com.fasterxml.jackson.databind.ObjectMapper; -import de.juplo.kafka.payment.transfer.adapter.KafkaMessagingService; -import de.juplo.kafka.payment.transfer.adapter.TransferConsumer; -import de.juplo.kafka.payment.transfer.domain.TransferService; -import de.juplo.kafka.payment.transfer.ports.MessagingService; +import de.juplo.kafka.payment.transfer.adapter.*; +import de.juplo.kafka.payment.transfer.domain.Transfer; +import de.juplo.kafka.payment.transfer.persistence.InMemoryTransferRepository; import de.juplo.kafka.payment.transfer.ports.TransferRepository; +import de.juplo.kafka.payment.transfer.ports.TransferService; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.CooperativeStickyAssignor; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -18,10 +22,17 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; +import org.springframework.web.reactive.function.client.WebClient; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Clock; +import java.util.Optional; import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; @SpringBootApplication @@ -29,13 +40,31 @@ import java.util.concurrent.Executors; @Slf4j public class TransferServiceApplication { + @Bean(destroyMethod = "close") + AdminClient adminClient(TransferServiceProperties properties) + { + Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set"); + + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers()); + + return AdminClient.create(props); + } + @Bean(destroyMethod = "close") KafkaProducer producer(TransferServiceProperties properties) { + Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set"); + Assert.hasText(properties.getTopic(), "juplo.transfer.topic must be set"); + Assert.notNull(properties.getNumPartitions(), "juplo.transfer.num-partitions must be set"); + Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, TransferPartitioner.class); + props.put(TransferPartitioner.TOPIC, properties.getTopic()); + props.put(TransferPartitioner.NUM_PARTITIONS, properties.getNumPartitions()); return new KafkaProducer<>(props); } @@ -43,57 +72,177 @@ public class TransferServiceApplication @Bean KafkaConsumer consumer(TransferServiceProperties properties) { + Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set"); + Assert.hasText(properties.getGroupId(), "juplo.transfer.group-id must be set"); + Assert.hasText(properties.getGroupInstanceId(), "juplo.transfer.group-instance-id must be set"); + Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers); - props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.groupId); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId()); + props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, properties.getGroupInstanceId()); + props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getCanonicalName()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new KafkaConsumer<>(props); } - @Bean(destroyMethod = "shutdown") - ExecutorService executorService() - { - return Executors.newFixedThreadPool(1); - } - @Bean(destroyMethod = "shutdown") TransferConsumer transferConsumer( TransferServiceProperties properties, KafkaConsumer consumer, - ExecutorService executorService, + AdminClient adminClient, + TransferRepository repository, + LocalStateStoreSettings localStateStoreSettings, ObjectMapper mapper, - TransferService transferService) + TransferService productionTransferService, + TransferService restoreTransferService) { - TransferConsumer transferConsumer = + return new TransferConsumer( - properties.topic, + properties.getTopic(), + properties.getNumPartitions(), + properties.getInstanceIdUriMapping(), consumer, - executorService, + adminClient, + repository, + Clock.systemDefaultZone(), + localStateStoreSettings.interval, mapper, - transferService, - transferService, - transferService); - transferConsumer.start(); - return transferConsumer; + new TransferConsumer.ConsumerUseCases() { + @Override + public TransferStateChangedEvent create(Long id, Long payer, Long payee, Integer amount) + { + return productionTransferService.create(id, payer, payee, amount); + } + + @Override + public Optional get(Long id) + { + return productionTransferService.get(id); + } + + @Override + public TransferStateChangedEvent handleStateChange( + TransferStateChangedEvent stateChangedEvent) + { + return productionTransferService.handleStateChange(stateChangedEvent); + } + }, + new TransferConsumer.ConsumerUseCases() { + @Override + public TransferStateChangedEvent create(Long id, Long payer, Long payee, Integer amount) + { + return restoreTransferService.create(id, payer, payee, amount); + } + + @Override + public Optional get(Long id) + { + return restoreTransferService.get(id); + } + + @Override + public TransferStateChangedEvent handleStateChange( + TransferStateChangedEvent stateChangedEvent) + { + return restoreTransferService.handleStateChange(stateChangedEvent); + } + }); } @Bean - MessagingService kafkaMessagingService( + KafkaMessagingService kafkaMessagingService( KafkaProducer producer, ObjectMapper mapper, TransferServiceProperties properties) { - return new KafkaMessagingService(producer, mapper, properties.topic); + return new KafkaMessagingService(producer, mapper, properties.getTopic()); + } + + @RequiredArgsConstructor + static class LocalStateStoreSettings + { + final Optional file; + final int interval; + } + + @Bean + LocalStateStoreSettings localStateStoreSettings(TransferServiceProperties properties) + { + if (properties.getStateStoreInterval() < 1) + { + log.info("juplo.transfer.state-store-interval is < 1: local storage of state is deactivated"); + return new LocalStateStoreSettings(Optional.empty(), 0); + } + + if (!StringUtils.hasText(properties.getLocalStateStorePath())) + { + log.info("juplo.transfer.local-state-store-path is not set: local storage of state is deactivated!"); + return new LocalStateStoreSettings(Optional.empty(), 0); + } + + Path path = Path.of(properties.getLocalStateStorePath()); + log.info("using {} as local state store", path.toAbsolutePath()); + + if (Files.notExists(path)) + { + try + { + Files.createFile(path); + } + catch (IOException e) + { + throw new IllegalArgumentException("Could not create local state store: " + path.toAbsolutePath()); + } + } + + if (!(Files.isReadable(path) && Files.isWritable(path))) + { + throw new IllegalArgumentException("No R/W-access on local state store: " + path.toAbsolutePath()); + } + + return new LocalStateStoreSettings(Optional.of(path.toFile()), properties.getStateStoreInterval()); + } + + @Bean + InMemoryTransferRepository inMemoryTransferRepository( + LocalStateStoreSettings localStateStoreSettings, + TransferServiceProperties properties, + ObjectMapper mapper) + { + return new InMemoryTransferRepository(localStateStoreSettings.file, properties.getNumPartitions(), mapper); } @Bean - TransferService transferService( + TransferService productionTransferService( TransferRepository repository, - MessagingService messagingService) + KafkaMessagingService kafkaMessagingService) + { + return new TransferService(repository, kafkaMessagingService); + } + + @Bean + TransferService restoreTransferService( + TransferRepository repository, + NoOpMessageService noOpMessageService) + { + return new TransferService(repository, noOpMessageService); + } + + @Bean + TransferController transferController( + TransferService productionTransferService, + KafkaMessagingService kafkaMessagingService, + TransferConsumer transferConsumer) { - return new TransferService(repository, messagingService); + return new TransferController( + productionTransferService, + kafkaMessagingService, + transferConsumer, + WebClient.create()); }