WIP
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / TransferServiceApplication.java
1 package de.juplo.kafka.payment.transfer;
2
3
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.adapter.*;
6 import de.juplo.kafka.payment.transfer.domain.Transfer;
7 import de.juplo.kafka.payment.transfer.persistence.InMemoryTransferRepository;
8 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
9 import de.juplo.kafka.payment.transfer.ports.TransferService;
10 import lombok.RequiredArgsConstructor;
11 import lombok.extern.slf4j.Slf4j;
12 import org.apache.kafka.clients.admin.AdminClient;
13 import org.apache.kafka.clients.admin.AdminClientConfig;
14 import org.apache.kafka.clients.consumer.ConsumerConfig;
15 import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
16 import org.apache.kafka.clients.consumer.KafkaConsumer;
17 import org.apache.kafka.clients.producer.KafkaProducer;
18 import org.apache.kafka.clients.producer.ProducerConfig;
19 import org.apache.kafka.common.serialization.StringDeserializer;
20 import org.apache.kafka.common.serialization.StringSerializer;
21 import org.springframework.boot.SpringApplication;
22 import org.springframework.boot.autoconfigure.SpringBootApplication;
23 import org.springframework.boot.context.properties.EnableConfigurationProperties;
24 import org.springframework.context.annotation.Bean;
25 import org.springframework.util.Assert;
26 import org.springframework.util.StringUtils;
27 import org.springframework.web.reactive.function.client.WebClient;
28
29 import java.io.File;
30 import java.io.IOException;
31 import java.nio.file.Files;
32 import java.nio.file.Path;
33 import java.time.Clock;
34 import java.util.Optional;
35 import java.util.Properties;
36
37
38 @SpringBootApplication
39 @EnableConfigurationProperties(TransferServiceProperties.class)
40 @Slf4j
41 public class TransferServiceApplication
42 {
43   @Bean(destroyMethod = "close")
44   AdminClient adminClient(TransferServiceProperties properties)
45   {
46     Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
47
48     Properties props = new Properties();
49     props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
50
51     return AdminClient.create(props);
52   }
53
54   @Bean(destroyMethod = "close")
55   KafkaProducer<String, String> producer(TransferServiceProperties properties)
56   {
57     Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
58     Assert.hasText(properties.getTopic(), "juplo.transfer.topic must be set");
59     Assert.notNull(properties.getNumPartitions(), "juplo.transfer.num-partitions must be set");
60
61     Properties props = new Properties();
62     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
63     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
64     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
65     props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, TransferPartitioner.class);
66     props.put(TransferPartitioner.TOPIC, properties.getTopic());
67     props.put(TransferPartitioner.NUM_PARTITIONS, properties.getNumPartitions());
68
69     return new KafkaProducer<>(props);
70   }
71
72   @Bean
73   KafkaConsumer<String, String> consumer(TransferServiceProperties properties)
74   {
75     Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
76     Assert.hasText(properties.getGroupId(), "juplo.transfer.group-id must be set");
77     Assert.hasText(properties.getGroupInstanceId(), "juplo.transfer.group-instance-id must be set");
78
79     Properties props = new Properties();
80     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
81     props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
82     props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, properties.getGroupInstanceId());
83     props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getCanonicalName());
84     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
85     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
86     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
87     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
88
89     return new KafkaConsumer<>(props);
90   }
91
92   @Bean(destroyMethod = "shutdown")
93   TransferConsumer transferConsumer(
94       TransferServiceProperties properties,
95       KafkaConsumer<String, String> consumer,
96       AdminClient adminClient,
97       TransferRepository repository,
98       LocalStateStoreSettings localStateStoreSettings,
99       ObjectMapper mapper,
100       TransferService productionTransferService,
101       TransferService restoreTransferService)
102   {
103     return
104         new TransferConsumer(
105             properties.getTopic(),
106             properties.getNumPartitions(),
107             properties.getInstanceIdUriMapping(),
108             consumer,
109             adminClient,
110             repository,
111             Clock.systemDefaultZone(),
112             localStateStoreSettings.interval,
113             mapper,
114             new TransferConsumer.ConsumerUseCases() {
115               @Override
116               public TransferStateChangedEvent create(Long id, Long payer, Long payee, Integer amount)
117               {
118                 return productionTransferService.create(id, payer, payee, amount);
119               }
120
121               @Override
122               public Optional<Transfer> get(Long id)
123               {
124                 return productionTransferService.get(id);
125               }
126
127               @Override
128               public TransferStateChangedEvent handleStateChange(
129                   TransferStateChangedEvent stateChangedEvent)
130               {
131                 return productionTransferService.handleStateChange(stateChangedEvent);
132               }
133             },
134             new TransferConsumer.ConsumerUseCases() {
135               @Override
136               public TransferStateChangedEvent create(Long id, Long payer, Long payee, Integer amount)
137               {
138                 return restoreTransferService.create(id, payer, payee, amount);
139               }
140
141               @Override
142               public Optional<Transfer> get(Long id)
143               {
144                 return restoreTransferService.get(id);
145               }
146
147               @Override
148               public TransferStateChangedEvent handleStateChange(
149                   TransferStateChangedEvent stateChangedEvent)
150               {
151                 return restoreTransferService.handleStateChange(stateChangedEvent);
152               }
153             });
154   }
155
156   @Bean
157   KafkaMessagingService kafkaMessagingService(
158       KafkaProducer<String, String> producer,
159       ObjectMapper mapper,
160       TransferServiceProperties properties)
161   {
162     return new KafkaMessagingService(producer, mapper, properties.getTopic());
163   }
164
165   @RequiredArgsConstructor
166   static class LocalStateStoreSettings
167   {
168     final Optional<File> file;
169     final int interval;
170   }
171
172   @Bean
173   LocalStateStoreSettings localStateStoreSettings(TransferServiceProperties properties)
174   {
175     if (properties.getStateStoreInterval() < 1)
176     {
177       log.info("juplo.transfer.state-store-interval is < 1: local storage of state is deactivated");
178       return new LocalStateStoreSettings(Optional.empty(), 0);
179     }
180
181     if (!StringUtils.hasText(properties.getLocalStateStorePath()))
182     {
183       log.info("juplo.transfer.local-state-store-path is not set: local storage of state is deactivated!");
184       return new LocalStateStoreSettings(Optional.empty(), 0);
185     }
186
187     Path path = Path.of(properties.getLocalStateStorePath());
188     log.info("using {} as local state store", path.toAbsolutePath());
189
190     if (Files.notExists(path))
191     {
192       try
193       {
194         Files.createFile(path);
195       }
196       catch (IOException e)
197       {
198         throw new IllegalArgumentException("Could not create local state store: " + path.toAbsolutePath());
199       }
200     }
201
202     if (!(Files.isReadable(path) && Files.isWritable(path)))
203     {
204       throw new IllegalArgumentException("No R/W-access on local state store: " + path.toAbsolutePath());
205     }
206
207     return new LocalStateStoreSettings(Optional.of(path.toFile()), properties.getStateStoreInterval());
208   }
209
210   @Bean
211   InMemoryTransferRepository inMemoryTransferRepository(
212       LocalStateStoreSettings localStateStoreSettings,
213       TransferServiceProperties properties,
214       ObjectMapper mapper)
215   {
216     return new InMemoryTransferRepository(localStateStoreSettings.file, properties.getNumPartitions(), mapper);
217   }
218
219   @Bean
220   TransferService productionTransferService(
221       TransferRepository repository,
222       KafkaMessagingService kafkaMessagingService)
223   {
224     return new TransferService(repository, kafkaMessagingService);
225   }
226
227   @Bean
228   TransferService restoreTransferService(
229       TransferRepository repository,
230       NoOpMessageService noOpMessageService)
231   {
232     return new TransferService(repository, noOpMessageService);
233   }
234
235   @Bean
236   TransferController transferController(
237       TransferService productionTransferService,
238       KafkaMessagingService kafkaMessagingService,
239       TransferConsumer transferConsumer)
240   {
241     return new TransferController(
242         productionTransferService,
243         kafkaMessagingService,
244         transferConsumer,
245         WebClient.create());
246   }
247
248
249   public static void main(String[] args)
250   {
251     SpringApplication.run(TransferServiceApplication.class, args);
252   }
253 }