WIP: instance-mapping from assignor
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / TransferServiceApplication.java
1 package de.juplo.kafka.payment.transfer;
2
3
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.adapter.*;
6 import de.juplo.kafka.payment.transfer.domain.Transfer;
7 import de.juplo.kafka.payment.transfer.persistence.InMemoryTransferRepository;
8 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
9 import de.juplo.kafka.payment.transfer.ports.TransferService;
10 import lombok.RequiredArgsConstructor;
11 import lombok.extern.slf4j.Slf4j;
12 import org.apache.kafka.clients.admin.AdminClient;
13 import org.apache.kafka.clients.admin.AdminClientConfig;
14 import org.apache.kafka.clients.consumer.KafkaConsumer;
15 import org.apache.kafka.clients.producer.KafkaProducer;
16 import org.apache.kafka.clients.producer.ProducerConfig;
17 import org.apache.kafka.common.serialization.StringSerializer;
18 import org.springframework.boot.SpringApplication;
19 import org.springframework.boot.autoconfigure.SpringBootApplication;
20 import org.springframework.boot.context.properties.EnableConfigurationProperties;
21 import org.springframework.context.annotation.Bean;
22 import org.springframework.util.Assert;
23 import org.springframework.util.StringUtils;
24 import org.springframework.web.reactive.function.client.WebClient;
25
26 import java.io.File;
27 import java.io.IOException;
28 import java.nio.file.Files;
29 import java.nio.file.Path;
30 import java.time.Clock;
31 import java.util.Optional;
32 import java.util.Properties;
33
34
35 @SpringBootApplication
36 @EnableConfigurationProperties(TransferServiceProperties.class)
37 @Slf4j
38 public class TransferServiceApplication
39 {
40   @Bean(destroyMethod = "close")
41   AdminClient adminClient(TransferServiceProperties properties)
42   {
43     Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
44
45     Properties props = new Properties();
46     props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
47
48     return AdminClient.create(props);
49   }
50
51   @Bean(destroyMethod = "close")
52   KafkaProducer<String, String> producer(TransferServiceProperties properties)
53   {
54     Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
55     Assert.hasText(properties.getTopic(), "juplo.transfer.topic must be set");
56     Assert.notNull(properties.getNumPartitions(), "juplo.transfer.num-partitions must be set");
57
58     Properties props = new Properties();
59     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
60     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
61     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
62     props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, TransferPartitioner.class);
63     props.put(TransferPartitioner.TOPIC, properties.getTopic());
64     props.put(TransferPartitioner.NUM_PARTITIONS, properties.getNumPartitions());
65
66     return new KafkaProducer<>(props);
67   }
68
69   @Bean(destroyMethod = "shutdown")
70   TransferConsumer transferConsumer(
71       TransferServiceProperties properties,
72       AdminClient adminClient,
73       TransferRepository repository,
74       LocalStateStoreSettings localStateStoreSettings,
75       ObjectMapper mapper,
76       TransferService productionTransferService,
77       TransferService restoreTransferService)
78   {
79     Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
80     Assert.hasText(properties.getGroupId(), "juplo.transfer.group-id must be set");
81     Assert.hasText(properties.getGroupInstanceId(), "juplo.transfer.group-instance-id must be set");
82
83     return
84         new TransferConsumer(
85             properties.getBootstrapServers(),
86             properties.getGroupId(),
87             properties.getGroupInstanceId(),
88             properties.getTopic(),
89             properties.getNumPartitions(),
90             properties.getInstanceIdUriMapping(),
91             adminClient,
92             repository,
93             Clock.systemDefaultZone(),
94             localStateStoreSettings.interval,
95             mapper,
96             new TransferConsumer.ConsumerUseCases() {
97               @Override
98               public void create(Long id, Long payer, Long payee, Integer amount)
99               {
100                 productionTransferService.create(id, payer, payee, amount);
101               }
102
103               @Override
104               public Optional<Transfer> get(Long id)
105               {
106                 return productionTransferService.get(id);
107               }
108
109               @Override
110               public void handleStateChange(Long id, Transfer.State state)
111               {
112                 productionTransferService.handleStateChange(id, state);
113               }
114             },
115             new TransferConsumer.ConsumerUseCases() {
116               @Override
117               public void create(Long id, Long payer, Long payee, Integer amount)
118               {
119                 restoreTransferService.create(id, payer, payee, amount);
120               }
121
122               @Override
123               public Optional<Transfer> get(Long id)
124               {
125                 return restoreTransferService.get(id);
126               }
127
128               @Override
129               public void handleStateChange(Long id, Transfer.State state)
130               {
131                 restoreTransferService.handleStateChange(id, state);
132               }
133             });
134   }
135
136   @Bean
137   KafkaMessagingService kafkaMessagingService(
138       KafkaProducer<String, String> producer,
139       ObjectMapper mapper,
140       TransferServiceProperties properties)
141   {
142     return new KafkaMessagingService(producer, mapper, properties.getTopic());
143   }
144
145   @RequiredArgsConstructor
146   static class LocalStateStoreSettings
147   {
148     final Optional<File> file;
149     final int interval;
150   }
151
152   @Bean
153   LocalStateStoreSettings localStateStoreSettings(TransferServiceProperties properties)
154   {
155     if (properties.getStateStoreInterval() < 1)
156     {
157       log.info("juplo.transfer.state-store-interval is < 1: local storage of state is deactivated");
158       return new LocalStateStoreSettings(Optional.empty(), 0);
159     }
160
161     if (!StringUtils.hasText(properties.getLocalStateStorePath()))
162     {
163       log.info("juplo.transfer.local-state-store-path is not set: local storage of state is deactivated!");
164       return new LocalStateStoreSettings(Optional.empty(), 0);
165     }
166
167     Path path = Path.of(properties.getLocalStateStorePath());
168     log.info("using {} as local state store", path.toAbsolutePath());
169
170     if (Files.notExists(path))
171     {
172       try
173       {
174         Files.createFile(path);
175       }
176       catch (IOException e)
177       {
178         throw new IllegalArgumentException("Could not create local state store: " + path.toAbsolutePath());
179       }
180     }
181
182     if (!(Files.isReadable(path) && Files.isWritable(path)))
183     {
184       throw new IllegalArgumentException("No R/W-access on local state store: " + path.toAbsolutePath());
185     }
186
187     return new LocalStateStoreSettings(Optional.of(path.toFile()), properties.getStateStoreInterval());
188   }
189
190   @Bean
191   InMemoryTransferRepository inMemoryTransferRepository(
192       LocalStateStoreSettings localStateStoreSettings,
193       TransferServiceProperties properties,
194       ObjectMapper mapper)
195   {
196     return new InMemoryTransferRepository(localStateStoreSettings.file, properties.getNumPartitions(), mapper);
197   }
198
199   @Bean
200   TransferService productionTransferService(
201       TransferRepository repository,
202       KafkaMessagingService kafkaMessagingService)
203   {
204     return new TransferService(repository, kafkaMessagingService);
205   }
206
207   @Bean
208   TransferService restoreTransferService(
209       TransferRepository repository,
210       NoOpMessageService noOpMessageService)
211   {
212     return new TransferService(repository, noOpMessageService);
213   }
214
215   @Bean
216   TransferController transferController(
217       TransferService productionTransferService,
218       KafkaMessagingService kafkaMessagingService,
219       TransferConsumer transferConsumer)
220   {
221     return new TransferController(
222         productionTransferService,
223         kafkaMessagingService,
224         transferConsumer,
225         WebClient.create());
226   }
227
228
229   public static void main(String[] args)
230   {
231     SpringApplication.run(TransferServiceApplication.class, args);
232   }
233 }