Simplified the thread-execution
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / TransferServiceApplication.java
1 package de.juplo.kafka.payment.transfer;
2
3
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.adapter.KafkaMessagingService;
6 import de.juplo.kafka.payment.transfer.adapter.NoOpMessageService;
7 import de.juplo.kafka.payment.transfer.adapter.TransferConsumer;
8 import de.juplo.kafka.payment.transfer.adapter.TransferController;
9 import de.juplo.kafka.payment.transfer.domain.Transfer;
10 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
11 import de.juplo.kafka.payment.transfer.ports.TransferService;
12 import lombok.extern.slf4j.Slf4j;
13 import org.apache.kafka.clients.consumer.ConsumerConfig;
14 import org.apache.kafka.clients.consumer.KafkaConsumer;
15 import org.apache.kafka.clients.producer.KafkaProducer;
16 import org.apache.kafka.clients.producer.ProducerConfig;
17 import org.apache.kafka.common.serialization.StringDeserializer;
18 import org.apache.kafka.common.serialization.StringSerializer;
19 import org.springframework.boot.SpringApplication;
20 import org.springframework.boot.autoconfigure.SpringBootApplication;
21 import org.springframework.boot.context.properties.EnableConfigurationProperties;
22 import org.springframework.context.annotation.Bean;
23
24 import java.util.Optional;
25 import java.util.Properties;
26
27
28 @SpringBootApplication
29 @EnableConfigurationProperties(TransferServiceProperties.class)
30 @Slf4j
31 public class TransferServiceApplication
32 {
33   @Bean(destroyMethod = "close")
34   KafkaProducer<String, String> producer(TransferServiceProperties properties)
35   {
36     Properties props = new Properties();
37     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
38     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
39     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
40
41     return new KafkaProducer<>(props);
42   }
43
44   @Bean
45   KafkaConsumer<String, String> consumer(TransferServiceProperties properties)
46   {
47     Properties props = new Properties();
48     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
49     props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.groupId);
50     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
51     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
52     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
53     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
54
55     return new KafkaConsumer<>(props);
56   }
57
58   @Bean(destroyMethod = "shutdown")
59   TransferConsumer transferConsumer(
60       TransferServiceProperties properties,
61       KafkaConsumer<String, String> consumer,
62       ObjectMapper mapper,
63       TransferService productionTransferService,
64       TransferService restoreTransferService)
65   {
66     return
67         new TransferConsumer(
68             properties.topic,
69             consumer,
70             mapper,
71             new TransferConsumer.ConsumerUseCases() {
72               @Override
73               public void create(Long id, Long payer, Long payee, Integer amount)
74               {
75                 productionTransferService.create(id, payer, payee, amount);
76               }
77
78               @Override
79               public Optional<Transfer> get(Long id)
80               {
81                 return productionTransferService.get(id);
82               }
83
84               @Override
85               public void handleStateChange(Long id, Transfer.State state)
86               {
87                 productionTransferService.handleStateChange(id, state);
88               }
89             },
90             new TransferConsumer.ConsumerUseCases() {
91               @Override
92               public void create(Long id, Long payer, Long payee, Integer amount)
93               {
94                 restoreTransferService.create(id, payer, payee, amount);
95               }
96
97               @Override
98               public Optional<Transfer> get(Long id)
99               {
100                 return restoreTransferService.get(id);
101               }
102
103               @Override
104               public void handleStateChange(Long id, Transfer.State state)
105               {
106                 restoreTransferService.handleStateChange(id, state);
107               }
108             });
109   }
110
111   @Bean
112   KafkaMessagingService kafkaMessagingService(
113       KafkaProducer<String, String> producer,
114       ObjectMapper mapper,
115       TransferServiceProperties properties)
116   {
117     return new KafkaMessagingService(producer, mapper, properties.topic);
118   }
119
120   @Bean
121   TransferService productionTransferService(
122       TransferRepository repository,
123       KafkaMessagingService kafkaMessagingService)
124   {
125     return new TransferService(repository, kafkaMessagingService);
126   }
127
128   @Bean
129   TransferService restoreTransferService(
130       TransferRepository repository,
131       NoOpMessageService noOpMessageService)
132   {
133     return new TransferService(repository, noOpMessageService);
134   }
135
136   @Bean
137   TransferController transferController(
138       TransferService productionTransferService,
139       KafkaMessagingService kafkaMessagingService)
140   {
141     return new TransferController(productionTransferService, kafkaMessagingService);
142   }
143
144
145   public static void main(String[] args)
146   {
147     SpringApplication.run(TransferServiceApplication.class, args);
148   }
149 }