Automatically rebuild the state after a crash / restart
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / TransferServiceApplication.java
1 package de.juplo.kafka.payment.transfer;
2
3
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.adapter.KafkaMessagingService;
6 import de.juplo.kafka.payment.transfer.adapter.NoOpMessageService;
7 import de.juplo.kafka.payment.transfer.adapter.TransferConsumer;
8 import de.juplo.kafka.payment.transfer.adapter.TransferController;
9 import de.juplo.kafka.payment.transfer.domain.Transfer;
10 import de.juplo.kafka.payment.transfer.domain.TransferService;
11 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
12 import lombok.extern.slf4j.Slf4j;
13 import org.apache.kafka.clients.consumer.ConsumerConfig;
14 import org.apache.kafka.clients.consumer.KafkaConsumer;
15 import org.apache.kafka.clients.producer.KafkaProducer;
16 import org.apache.kafka.clients.producer.ProducerConfig;
17 import org.apache.kafka.common.serialization.StringDeserializer;
18 import org.apache.kafka.common.serialization.StringSerializer;
19 import org.springframework.boot.SpringApplication;
20 import org.springframework.boot.autoconfigure.SpringBootApplication;
21 import org.springframework.boot.context.properties.EnableConfigurationProperties;
22 import org.springframework.context.annotation.Bean;
23
24 import java.util.Optional;
25 import java.util.Properties;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28
29
30 @SpringBootApplication
31 @EnableConfigurationProperties(TransferServiceProperties.class)
32 @Slf4j
33 public class TransferServiceApplication
34 {
35   @Bean(destroyMethod = "close")
36   KafkaProducer<String, String> producer(TransferServiceProperties properties)
37   {
38     Properties props = new Properties();
39     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
40     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
41     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
42
43     return new KafkaProducer<>(props);
44   }
45
46   @Bean
47   KafkaConsumer<String, String> consumer(TransferServiceProperties properties)
48   {
49     Properties props = new Properties();
50     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
51     props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.groupId);
52     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
53     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
54     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
55     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
56
57     return new KafkaConsumer<>(props);
58   }
59
60   @Bean(destroyMethod = "shutdown")
61   ExecutorService executorService()
62   {
63     return Executors.newFixedThreadPool(1);
64   }
65
66   @Bean(destroyMethod = "shutdown")
67   TransferConsumer transferConsumer(
68       TransferServiceProperties properties,
69       KafkaConsumer<String, String> consumer,
70       ExecutorService executorService,
71       ObjectMapper mapper,
72       TransferService productionTransferService,
73       TransferService restoreTransferService)
74   {
75     return
76         new TransferConsumer(
77             properties.topic,
78             consumer,
79             executorService,
80             mapper,
81             new TransferConsumer.ConsumerUseCases() {
82               @Override
83               public void create(Transfer transfer)
84               {
85                 productionTransferService.create(transfer);
86               }
87
88               @Override
89               public Optional<Transfer> get(Long id)
90               {
91                 return productionTransferService.get(id);
92               }
93
94               @Override
95               public void handle(Transfer transfer)
96               {
97                 productionTransferService.handle(transfer);
98               }
99             },
100             new TransferConsumer.ConsumerUseCases() {
101               @Override
102               public void create(Transfer transfer)
103               {
104                 restoreTransferService.create(transfer);
105               }
106
107               @Override
108               public Optional<Transfer> get(Long id)
109               {
110                 return restoreTransferService.get(id);
111               }
112
113               @Override
114               public void handle(Transfer transfer)
115               {
116                 restoreTransferService.handle(transfer);
117               }
118             });
119   }
120
121   @Bean
122   KafkaMessagingService kafkaMessagingService(
123       KafkaProducer<String, String> producer,
124       ObjectMapper mapper,
125       TransferServiceProperties properties)
126   {
127     return new KafkaMessagingService(producer, mapper, properties.topic);
128   }
129
130   @Bean
131   TransferService productionTransferService(
132       TransferRepository repository,
133       KafkaMessagingService kafkaMessagingService)
134   {
135     return new TransferService(repository, kafkaMessagingService);
136   }
137
138   @Bean
139   TransferService restoreTransferService(
140       TransferRepository repository,
141       NoOpMessageService noOpMessageService)
142   {
143     return new TransferService(repository, noOpMessageService);
144   }
145
146   @Bean
147   TransferController transferController(
148       TransferService productionTransferService,
149       KafkaMessagingService kafkaMessagingService)
150   {
151     return new TransferController(productionTransferService, kafkaMessagingService);
152   }
153
154
155   public static void main(String[] args)
156   {
157     SpringApplication.run(TransferServiceApplication.class, args);
158   }
159 }