1 package de.juplo.kafka.payment.transfer;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.adapter.*;
6 import de.juplo.kafka.payment.transfer.domain.Transfer;
7 import de.juplo.kafka.payment.transfer.persistence.InMemoryTransferRepository;
8 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
9 import de.juplo.kafka.payment.transfer.ports.TransferService;
10 import lombok.RequiredArgsConstructor;
11 import lombok.extern.slf4j.Slf4j;
12 import org.apache.kafka.clients.admin.AdminClient;
13 import org.apache.kafka.clients.admin.AdminClientConfig;
14 import org.apache.kafka.clients.consumer.KafkaConsumer;
15 import org.apache.kafka.clients.producer.KafkaProducer;
16 import org.apache.kafka.clients.producer.ProducerConfig;
17 import org.apache.kafka.common.serialization.StringSerializer;
18 import org.springframework.boot.SpringApplication;
19 import org.springframework.boot.autoconfigure.SpringBootApplication;
20 import org.springframework.boot.context.properties.EnableConfigurationProperties;
21 import org.springframework.context.annotation.Bean;
22 import org.springframework.util.Assert;
23 import org.springframework.util.StringUtils;
24 import org.springframework.web.reactive.function.client.WebClient;
27 import java.io.IOException;
28 import java.nio.file.Files;
29 import java.nio.file.Path;
30 import java.time.Clock;
31 import java.util.Optional;
32 import java.util.Properties;
35 @SpringBootApplication
36 @EnableConfigurationProperties(TransferServiceProperties.class)
38 public class TransferServiceApplication
40 @Bean(destroyMethod = "close")
41 AdminClient adminClient(TransferServiceProperties properties)
43 Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
45 Properties props = new Properties();
46 props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
48 return AdminClient.create(props);
51 @Bean(destroyMethod = "close")
52 KafkaProducer<String, String> producer(TransferServiceProperties properties)
54 Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
55 Assert.hasText(properties.getTopic(), "juplo.transfer.topic must be set");
56 Assert.notNull(properties.getNumPartitions(), "juplo.transfer.num-partitions must be set");
58 Properties props = new Properties();
59 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
60 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
61 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
62 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, TransferPartitioner.class);
63 props.put(TransferPartitioner.TOPIC, properties.getTopic());
64 props.put(TransferPartitioner.NUM_PARTITIONS, properties.getNumPartitions());
66 return new KafkaProducer<>(props);
69 @Bean(destroyMethod = "shutdown")
70 TransferConsumer transferConsumer(
71 TransferServiceProperties properties,
72 AdminClient adminClient,
73 TransferRepository repository,
74 LocalStateStoreSettings localStateStoreSettings,
76 TransferService productionTransferService,
77 TransferService restoreTransferService)
79 Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
80 Assert.hasText(properties.getGroupId(), "juplo.transfer.group-id must be set");
81 Assert.hasText(properties.getGroupInstanceId(), "juplo.transfer.group-instance-id must be set");
85 properties.getBootstrapServers(),
86 properties.getGroupId(),
87 properties.getGroupInstanceId(),
88 properties.getTopic(),
89 properties.getNumPartitions(),
90 properties.getInstanceIdUriMapping(),
93 Clock.systemDefaultZone(),
94 localStateStoreSettings.interval,
96 new TransferConsumer.ConsumerUseCases() {
98 public void create(Long id, Long payer, Long payee, Integer amount)
100 productionTransferService.create(id, payer, payee, amount);
104 public Optional<Transfer> get(Long id)
106 return productionTransferService.get(id);
110 public void handleStateChange(Long id, Transfer.State state)
112 productionTransferService.handleStateChange(id, state);
115 new TransferConsumer.ConsumerUseCases() {
117 public void create(Long id, Long payer, Long payee, Integer amount)
119 restoreTransferService.create(id, payer, payee, amount);
123 public Optional<Transfer> get(Long id)
125 return restoreTransferService.get(id);
129 public void handleStateChange(Long id, Transfer.State state)
131 restoreTransferService.handleStateChange(id, state);
137 KafkaMessagingService kafkaMessagingService(
138 KafkaProducer<String, String> producer,
140 TransferServiceProperties properties)
142 return new KafkaMessagingService(producer, mapper, properties.getTopic());
145 @RequiredArgsConstructor
146 static class LocalStateStoreSettings
148 final Optional<File> file;
153 LocalStateStoreSettings localStateStoreSettings(TransferServiceProperties properties)
155 if (properties.getStateStoreInterval() < 1)
157 log.info("juplo.transfer.state-store-interval is < 1: local storage of state is deactivated");
158 return new LocalStateStoreSettings(Optional.empty(), 0);
161 if (!StringUtils.hasText(properties.getLocalStateStorePath()))
163 log.info("juplo.transfer.local-state-store-path is not set: local storage of state is deactivated!");
164 return new LocalStateStoreSettings(Optional.empty(), 0);
167 Path path = Path.of(properties.getLocalStateStorePath());
168 log.info("using {} as local state store", path.toAbsolutePath());
170 if (Files.notExists(path))
174 Files.createFile(path);
176 catch (IOException e)
178 throw new IllegalArgumentException("Could not create local state store: " + path.toAbsolutePath());
182 if (!(Files.isReadable(path) && Files.isWritable(path)))
184 throw new IllegalArgumentException("No R/W-access on local state store: " + path.toAbsolutePath());
187 return new LocalStateStoreSettings(Optional.of(path.toFile()), properties.getStateStoreInterval());
191 InMemoryTransferRepository inMemoryTransferRepository(
192 LocalStateStoreSettings localStateStoreSettings,
193 TransferServiceProperties properties,
196 return new InMemoryTransferRepository(localStateStoreSettings.file, properties.getNumPartitions(), mapper);
200 TransferService productionTransferService(
201 TransferRepository repository,
202 KafkaMessagingService kafkaMessagingService)
204 return new TransferService(repository, kafkaMessagingService);
208 TransferService restoreTransferService(
209 TransferRepository repository,
210 NoOpMessageService noOpMessageService)
212 return new TransferService(repository, noOpMessageService);
216 TransferController transferController(
217 TransferService productionTransferService,
218 KafkaMessagingService kafkaMessagingService,
219 TransferConsumer transferConsumer)
221 return new TransferController(
222 productionTransferService,
223 kafkaMessagingService,
229 public static void main(String[] args)
231 SpringApplication.run(TransferServiceApplication.class, args);