Bugfix: Check for existence of a new transfer requires a remote-call
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / TransferServiceApplication.java
1 package de.juplo.kafka.payment.transfer;
2
3
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.adapter.*;
6 import de.juplo.kafka.payment.transfer.domain.Transfer;
7 import de.juplo.kafka.payment.transfer.persistence.InMemoryTransferRepository;
8 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
9 import de.juplo.kafka.payment.transfer.ports.TransferService;
10 import lombok.RequiredArgsConstructor;
11 import lombok.extern.slf4j.Slf4j;
12 import org.apache.kafka.clients.admin.AdminClient;
13 import org.apache.kafka.clients.admin.AdminClientConfig;
14 import org.apache.kafka.clients.consumer.ConsumerConfig;
15 import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
16 import org.apache.kafka.clients.consumer.KafkaConsumer;
17 import org.apache.kafka.clients.producer.KafkaProducer;
18 import org.apache.kafka.clients.producer.ProducerConfig;
19 import org.apache.kafka.common.serialization.StringDeserializer;
20 import org.apache.kafka.common.serialization.StringSerializer;
21 import org.springframework.boot.SpringApplication;
22 import org.springframework.boot.autoconfigure.SpringBootApplication;
23 import org.springframework.boot.context.properties.EnableConfigurationProperties;
24 import org.springframework.context.annotation.Bean;
25 import org.springframework.util.Assert;
26 import org.springframework.util.StringUtils;
27 import org.springframework.web.reactive.function.client.WebClient;
28
29 import java.io.File;
30 import java.io.IOException;
31 import java.nio.file.Files;
32 import java.nio.file.Path;
33 import java.time.Clock;
34 import java.util.Optional;
35 import java.util.Properties;
36
37
38 @SpringBootApplication
39 @EnableConfigurationProperties(TransferServiceProperties.class)
40 @Slf4j
41 public class TransferServiceApplication
42 {
43   @Bean(destroyMethod = "close")
44   AdminClient adminClient(TransferServiceProperties properties)
45   {
46     Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
47
48     Properties props = new Properties();
49     props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
50
51     return AdminClient.create(props);
52   }
53
54   @Bean(destroyMethod = "close")
55   KafkaProducer<String, String> producer(TransferServiceProperties properties)
56   {
57     Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
58     Assert.hasText(properties.getTopic(), "juplo.transfer.topic must be set");
59     Assert.notNull(properties.getNumPartitions(), "juplo.transfer.num-partitions must be set");
60
61     Properties props = new Properties();
62     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
63     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
64     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
65     props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, TransferPartitioner.class);
66     props.put(TransferPartitioner.TOPIC, properties.getTopic());
67     props.put(TransferPartitioner.NUM_PARTITIONS, properties.getNumPartitions());
68
69     return new KafkaProducer<>(props);
70   }
71
72   @Bean
73   KafkaConsumer<String, String> consumer(TransferServiceProperties properties)
74   {
75     Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
76     Assert.hasText(properties.getGroupId(), "juplo.transfer.group-id must be set");
77     Assert.hasText(properties.getGroupInstanceId(), "juplo.transfer.group-instance-id must be set");
78
79     Properties props = new Properties();
80     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
81     props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
82     props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, properties.getGroupInstanceId());
83     props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getCanonicalName());
84     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
85     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
86     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
87     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
88
89     return new KafkaConsumer<>(props);
90   }
91
92   @Bean(destroyMethod = "shutdown")
93   TransferConsumer transferConsumer(
94       TransferServiceProperties properties,
95       KafkaConsumer<String, String> consumer,
96       AdminClient adminClient,
97       TransferRepository repository,
98       LocalStateStoreSettings localStateStoreSettings,
99       ObjectMapper mapper,
100       TransferService productionTransferService,
101       TransferService restoreTransferService)
102   {
103     return
104         new TransferConsumer(
105             properties.getTopic(),
106             properties.getNumPartitions(),
107             properties.getInstanceIdUriMapping(),
108             consumer,
109             adminClient,
110             repository,
111             Clock.systemDefaultZone(),
112             localStateStoreSettings.interval,
113             mapper,
114             new TransferConsumer.ConsumerUseCases() {
115               @Override
116               public void create(Long id, Long payer, Long payee, Integer amount)
117               {
118                 productionTransferService.create(id, payer, payee, amount);
119               }
120
121               @Override
122               public Optional<Transfer> get(Long id)
123               {
124                 return productionTransferService.get(id);
125               }
126
127               @Override
128               public void handleStateChange(Long id, Transfer.State state)
129               {
130                 productionTransferService.handleStateChange(id, state);
131               }
132             },
133             new TransferConsumer.ConsumerUseCases() {
134               @Override
135               public void create(Long id, Long payer, Long payee, Integer amount)
136               {
137                 restoreTransferService.create(id, payer, payee, amount);
138               }
139
140               @Override
141               public Optional<Transfer> get(Long id)
142               {
143                 return restoreTransferService.get(id);
144               }
145
146               @Override
147               public void handleStateChange(Long id, Transfer.State state)
148               {
149                 restoreTransferService.handleStateChange(id, state);
150               }
151             });
152   }
153
154   @Bean
155   KafkaMessagingService kafkaMessagingService(
156       KafkaProducer<String, String> producer,
157       ObjectMapper mapper,
158       TransferServiceProperties properties)
159   {
160     return new KafkaMessagingService(producer, mapper, properties.getTopic());
161   }
162
163   @RequiredArgsConstructor
164   static class LocalStateStoreSettings
165   {
166     final Optional<File> file;
167     final int interval;
168   }
169
170   @Bean
171   LocalStateStoreSettings localStateStoreSettings(TransferServiceProperties properties)
172   {
173     if (properties.getStateStoreInterval() < 1)
174     {
175       log.info("juplo.transfer.state-store-interval is < 1: local storage of state is deactivated");
176       return new LocalStateStoreSettings(Optional.empty(), 0);
177     }
178
179     if (!StringUtils.hasText(properties.getLocalStateStorePath()))
180     {
181       log.info("juplo.transfer.local-state-store-path is not set: local storage of state is deactivated!");
182       return new LocalStateStoreSettings(Optional.empty(), 0);
183     }
184
185     Path path = Path.of(properties.getLocalStateStorePath());
186     log.info("using {} as local state store", path.toAbsolutePath());
187
188     if (Files.notExists(path))
189     {
190       try
191       {
192         Files.createFile(path);
193       }
194       catch (IOException e)
195       {
196         throw new IllegalArgumentException("Could not create local state store: " + path.toAbsolutePath());
197       }
198     }
199
200     if (!(Files.isReadable(path) && Files.isWritable(path)))
201     {
202       throw new IllegalArgumentException("No R/W-access on local state store: " + path.toAbsolutePath());
203     }
204
205     return new LocalStateStoreSettings(Optional.of(path.toFile()), properties.getStateStoreInterval());
206   }
207
208   @Bean
209   InMemoryTransferRepository inMemoryTransferRepository(
210       LocalStateStoreSettings localStateStoreSettings,
211       TransferServiceProperties properties,
212       ObjectMapper mapper)
213   {
214     return new InMemoryTransferRepository(localStateStoreSettings.file, properties.getNumPartitions(), mapper);
215   }
216
217   @Bean
218   TransferService productionTransferService(
219       TransferRepository repository,
220       KafkaMessagingService kafkaMessagingService)
221   {
222     return new TransferService(repository, kafkaMessagingService);
223   }
224
225   @Bean
226   TransferService restoreTransferService(
227       TransferRepository repository,
228       NoOpMessageService noOpMessageService)
229   {
230     return new TransferService(repository, noOpMessageService);
231   }
232
233   @Bean
234   TransferController transferController(
235       TransferService productionTransferService,
236       KafkaMessagingService kafkaMessagingService,
237       TransferConsumer transferConsumer)
238   {
239     return new TransferController(
240         productionTransferService,
241         kafkaMessagingService,
242         transferConsumer,
243         WebClient.create());
244   }
245
246
247   public static void main(String[] args)
248   {
249     SpringApplication.run(TransferServiceApplication.class, args);
250   }
251 }