X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;ds=sidebyside;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2FTransferServiceApplication.java;h=5114a1c4fff0685bc016ec1c181f898a515f919c;hb=2432aeedb30ac4c1405045514d8eacb791a4d352;hp=a82f8b109abeaec2ae9106f4e66f311dcb1347ff;hpb=c64f93de3e59af674885fdad08c521d82f4802d1;p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer diff --git a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java index a82f8b1..5114a1c 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java @@ -7,6 +7,7 @@ import de.juplo.kafka.payment.transfer.domain.Transfer; import de.juplo.kafka.payment.transfer.persistence.InMemoryTransferRepository; import de.juplo.kafka.payment.transfer.ports.TransferRepository; import de.juplo.kafka.payment.transfer.ports.TransferService; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; @@ -22,7 +23,13 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.util.Assert; +import org.springframework.util.StringUtils; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Clock; import java.util.Optional; import java.util.Properties; @@ -87,6 +94,7 @@ public class TransferServiceApplication KafkaConsumer consumer, AdminClient adminClient, TransferRepository repository, + LocalStateStoreSettings localStateStoreSettings, ObjectMapper mapper, TransferService productionTransferService, TransferService restoreTransferService) @@ -99,6 +107,8 @@ public class TransferServiceApplication consumer, adminClient, repository, + Clock.systemDefaultZone(), + localStateStoreSettings.interval, mapper, new TransferConsumer.ConsumerUseCases() { @Override @@ -149,12 +159,58 @@ public class TransferServiceApplication return new KafkaMessagingService(producer, mapper, properties.getTopic()); } + @RequiredArgsConstructor + static class LocalStateStoreSettings + { + final Optional file; + final int interval; + } + + @Bean + LocalStateStoreSettings localStateStoreSettings(TransferServiceProperties properties) + { + if (properties.getStateStoreInterval() < 1) + { + log.info("juplo.transfer.state-store-interval is < 1: local storage of state is deactivated"); + return new LocalStateStoreSettings(Optional.empty(), 0); + } + + if (!StringUtils.hasText(properties.getLocalStateStorePath())) + { + log.info("juplo.transfer.local-state-store-path is not set: local storage of state is deactivated!"); + return new LocalStateStoreSettings(Optional.empty(), 0); + } + + Path path = Path.of(properties.getLocalStateStorePath()); + log.info("using {} as local state store", path.toAbsolutePath()); + + if (Files.notExists(path)) + { + try + { + Files.createFile(path); + } + catch (IOException e) + { + throw new IllegalArgumentException("Could not create local state store: " + path.toAbsolutePath()); + } + } + + if (!(Files.isReadable(path) && Files.isWritable(path))) + { + throw new IllegalArgumentException("No R/W-access on local state store: " + path.toAbsolutePath()); + } + + return new LocalStateStoreSettings(Optional.of(path.toFile()), properties.getStateStoreInterval()); + } + @Bean InMemoryTransferRepository inMemoryTransferRepository( + LocalStateStoreSettings localStateStoreSettings, TransferServiceProperties properties, ObjectMapper mapper) { - return new InMemoryTransferRepository(properties.getNumPartitions(), mapper); + return new InMemoryTransferRepository(localStateStoreSettings.file, properties.getNumPartitions(), mapper); } @Bean