53ff0f4b6673261e7a4b6050f740aa127a483e2a
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / TransferServiceApplication.java
1 package de.juplo.kafka.payment.transfer;
2
3
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.adapter.KafkaMessagingService;
6 import de.juplo.kafka.payment.transfer.adapter.NoOpMessageService;
7 import de.juplo.kafka.payment.transfer.adapter.TransferConsumer;
8 import de.juplo.kafka.payment.transfer.adapter.TransferController;
9 import de.juplo.kafka.payment.transfer.domain.Transfer;
10 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
11 import de.juplo.kafka.payment.transfer.ports.TransferService;
12 import lombok.extern.slf4j.Slf4j;
13 import org.apache.kafka.clients.consumer.ConsumerConfig;
14 import org.apache.kafka.clients.consumer.KafkaConsumer;
15 import org.apache.kafka.clients.producer.KafkaProducer;
16 import org.apache.kafka.clients.producer.ProducerConfig;
17 import org.apache.kafka.common.serialization.StringDeserializer;
18 import org.apache.kafka.common.serialization.StringSerializer;
19 import org.springframework.boot.SpringApplication;
20 import org.springframework.boot.autoconfigure.SpringBootApplication;
21 import org.springframework.boot.context.properties.EnableConfigurationProperties;
22 import org.springframework.context.annotation.Bean;
23 import org.springframework.util.Assert;
24
25 import java.util.Optional;
26 import java.util.Properties;
27
28
29 @SpringBootApplication
30 @EnableConfigurationProperties(TransferServiceProperties.class)
31 @Slf4j
32 public class TransferServiceApplication
33 {
34   @Bean(destroyMethod = "close")
35   KafkaProducer<String, String> producer(TransferServiceProperties properties)
36   {
37     Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
38     Assert.hasText(properties.getTopic(), "juplo.transfer.topic must be set");
39
40     Properties props = new Properties();
41     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
42     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
43     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
44
45     return new KafkaProducer<>(props);
46   }
47
48   @Bean
49   KafkaConsumer<String, String> consumer(TransferServiceProperties properties)
50   {
51     Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
52     Assert.hasText(properties.getGroupId(), "juplo.transfer.group-id must be set");
53
54     Properties props = new Properties();
55     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
56     props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
57     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
58     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
59     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
60     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
61
62     return new KafkaConsumer<>(props);
63   }
64
65   @Bean(destroyMethod = "shutdown")
66   TransferConsumer transferConsumer(
67       TransferServiceProperties properties,
68       KafkaConsumer<String, String> consumer,
69       ObjectMapper mapper,
70       TransferService productionTransferService,
71       TransferService restoreTransferService)
72   {
73     return
74         new TransferConsumer(
75             properties.getTopic(),
76             consumer,
77             mapper,
78             new TransferConsumer.ConsumerUseCases() {
79               @Override
80               public void create(Long id, Long payer, Long payee, Integer amount)
81               {
82                 productionTransferService.create(id, payer, payee, amount);
83               }
84
85               @Override
86               public Optional<Transfer> get(Long id)
87               {
88                 return productionTransferService.get(id);
89               }
90
91               @Override
92               public void handleStateChange(Long id, Transfer.State state)
93               {
94                 productionTransferService.handleStateChange(id, state);
95               }
96             },
97             new TransferConsumer.ConsumerUseCases() {
98               @Override
99               public void create(Long id, Long payer, Long payee, Integer amount)
100               {
101                 restoreTransferService.create(id, payer, payee, amount);
102               }
103
104               @Override
105               public Optional<Transfer> get(Long id)
106               {
107                 return restoreTransferService.get(id);
108               }
109
110               @Override
111               public void handleStateChange(Long id, Transfer.State state)
112               {
113                 restoreTransferService.handleStateChange(id, state);
114               }
115             });
116   }
117
118   @Bean
119   KafkaMessagingService kafkaMessagingService(
120       KafkaProducer<String, String> producer,
121       ObjectMapper mapper,
122       TransferServiceProperties properties)
123   {
124     return new KafkaMessagingService(producer, mapper, properties.getTopic());
125   }
126
127   @Bean
128   TransferService productionTransferService(
129       TransferRepository repository,
130       KafkaMessagingService kafkaMessagingService)
131   {
132     return new TransferService(repository, kafkaMessagingService);
133   }
134
135   @Bean
136   TransferService restoreTransferService(
137       TransferRepository repository,
138       NoOpMessageService noOpMessageService)
139   {
140     return new TransferService(repository, noOpMessageService);
141   }
142
143   @Bean
144   TransferController transferController(
145       TransferService productionTransferService,
146       KafkaMessagingService kafkaMessagingService)
147   {
148     return new TransferController(productionTransferService, kafkaMessagingService);
149   }
150
151
152   public static void main(String[] args)
153   {
154     SpringApplication.run(TransferServiceApplication.class, args);
155   }
156 }