import de.juplo.kafka.payment.transfer.persistence.InMemoryTransferRepository;
import de.juplo.kafka.payment.transfer.ports.TransferRepository;
import de.juplo.kafka.payment.transfer.ports.TransferService;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.util.Assert;
+import org.springframework.util.StringUtils;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Clock;
import java.util.Optional;
import java.util.Properties;
KafkaConsumer<String, String> consumer,
AdminClient adminClient,
TransferRepository repository,
+ LocalStateStoreSettings localStateStoreSettings,
ObjectMapper mapper,
TransferService productionTransferService,
TransferService restoreTransferService)
consumer,
adminClient,
repository,
+ Clock.systemDefaultZone(),
+ localStateStoreSettings.interval,
mapper,
new TransferConsumer.ConsumerUseCases() {
@Override
return new KafkaMessagingService(producer, mapper, properties.getTopic());
}
+ @RequiredArgsConstructor
+ static class LocalStateStoreSettings
+ {
+ final Optional<File> file;
+ final int interval;
+ }
+
+ @Bean
+ LocalStateStoreSettings localStateStoreSettings(TransferServiceProperties properties)
+ {
+ if (properties.getStateStoreInterval() < 1)
+ {
+ log.info("juplo.transfer.state-store-interval is < 1: local storage of state is deactivated");
+ return new LocalStateStoreSettings(Optional.empty(), 0);
+ }
+
+ if (!StringUtils.hasText(properties.getLocalStateStorePath()))
+ {
+ log.info("juplo.transfer.local-state-store-path is not set: local storage of state is deactivated!");
+ return new LocalStateStoreSettings(Optional.empty(), 0);
+ }
+
+ Path path = Path.of(properties.getLocalStateStorePath());
+ log.info("using {} as local state store", path.toAbsolutePath());
+
+ if (Files.notExists(path))
+ {
+ try
+ {
+ Files.createFile(path);
+ }
+ catch (IOException e)
+ {
+ throw new IllegalArgumentException("Could not create local state store: " + path.toAbsolutePath());
+ }
+ }
+
+ if (!(Files.isReadable(path) && Files.isWritable(path)))
+ {
+ throw new IllegalArgumentException("No R/W-access on local state store: " + path.toAbsolutePath());
+ }
+
+ return new LocalStateStoreSettings(Optional.of(path.toFile()), properties.getStateStoreInterval());
+ }
+
@Bean
InMemoryTransferRepository inMemoryTransferRepository(
+ LocalStateStoreSettings localStateStoreSettings,
TransferServiceProperties properties,
ObjectMapper mapper)
{
- return new InMemoryTransferRepository(properties.getNumPartitions(), mapper);
+ return new InMemoryTransferRepository(localStateStoreSettings.file, properties.getNumPartitions(), mapper);
}
@Bean