58a3af2d0c2bf7cb1e4e430dee9603f22d0d7bac
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / TransferServiceApplication.java
1 package de.juplo.kafka.payment.transfer;
2
3
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.adapter.KafkaMessagingService;
6 import de.juplo.kafka.payment.transfer.adapter.TransferConsumer;
7 import de.juplo.kafka.payment.transfer.domain.TransferService;
8 import de.juplo.kafka.payment.transfer.ports.MessagingService;
9 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
10 import lombok.extern.slf4j.Slf4j;
11 import org.apache.kafka.clients.consumer.ConsumerConfig;
12 import org.apache.kafka.clients.consumer.KafkaConsumer;
13 import org.apache.kafka.clients.producer.KafkaProducer;
14 import org.apache.kafka.clients.producer.ProducerConfig;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16 import org.apache.kafka.common.serialization.StringSerializer;
17 import org.springframework.boot.SpringApplication;
18 import org.springframework.boot.autoconfigure.SpringBootApplication;
19 import org.springframework.boot.context.properties.EnableConfigurationProperties;
20 import org.springframework.context.annotation.Bean;
21
22 import java.util.Properties;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Executors;
25
26
27 @SpringBootApplication
28 @EnableConfigurationProperties(TransferServiceProperties.class)
29 @Slf4j
30 public class TransferServiceApplication
31 {
32   @Bean(destroyMethod = "close")
33   KafkaProducer<String, String> producer(TransferServiceProperties properties)
34   {
35     Properties props = new Properties();
36     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
37     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
38     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
39
40     return new KafkaProducer<>(props);
41   }
42
43   @Bean
44   KafkaConsumer<String, String> consumer(TransferServiceProperties properties)
45   {
46     Properties props = new Properties();
47     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
48     props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.groupId);
49     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
50     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
51
52     return new KafkaConsumer<>(props);
53   }
54
55   @Bean(destroyMethod = "shutdown")
56   ExecutorService executorService()
57   {
58     return Executors.newFixedThreadPool(1);
59   }
60
61   @Bean(destroyMethod = "shutdown")
62   TransferConsumer transferConsumer(
63       TransferServiceProperties properties,
64       KafkaConsumer<String, String> consumer,
65       ExecutorService executorService,
66       ObjectMapper mapper,
67       TransferService transferService)
68   {
69     TransferConsumer transferConsumer =
70         new TransferConsumer(
71             properties.topic,
72             consumer,
73             executorService,
74             mapper,
75             transferService,
76             transferService,
77             transferService);
78     transferConsumer.start();
79     return transferConsumer;
80   }
81
82   @Bean
83   MessagingService kafkaMessagingService(
84       KafkaProducer<String, String> producer,
85       ObjectMapper mapper,
86       TransferServiceProperties properties)
87   {
88     return new KafkaMessagingService(producer, mapper, properties.topic);
89   }
90
91   @Bean
92   TransferService transferService(
93       TransferRepository repository,
94       MessagingService messagingService)
95   {
96     return new TransferService(repository, messagingService);
97   }
98
99
100   public static void main(String[] args)
101   {
102     SpringApplication.run(TransferServiceApplication.class, args);
103   }
104 }