TransferRepository does not need any synchronization
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / TransferServiceApplication.java
1 package de.juplo.kafka.payment.transfer;
2
3
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.adapter.KafkaMessagingService;
6 import de.juplo.kafka.payment.transfer.adapter.TransferConsumer;
7 import de.juplo.kafka.payment.transfer.domain.TransferService;
8 import de.juplo.kafka.payment.transfer.ports.MessagingService;
9 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
10 import lombok.extern.slf4j.Slf4j;
11 import org.apache.kafka.clients.consumer.ConsumerConfig;
12 import org.apache.kafka.clients.consumer.KafkaConsumer;
13 import org.apache.kafka.clients.producer.KafkaProducer;
14 import org.apache.kafka.clients.producer.ProducerConfig;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16 import org.apache.kafka.common.serialization.StringSerializer;
17 import org.springframework.boot.SpringApplication;
18 import org.springframework.boot.autoconfigure.SpringBootApplication;
19 import org.springframework.boot.context.properties.EnableConfigurationProperties;
20 import org.springframework.context.annotation.Bean;
21
22 import java.util.Properties;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Executors;
25
26
27 @SpringBootApplication
28 @EnableConfigurationProperties(TransferServiceProperties.class)
29 @Slf4j
30 public class TransferServiceApplication
31 {
32   @Bean(destroyMethod = "close")
33   KafkaProducer<String, String> producer(TransferServiceProperties properties)
34   {
35     Properties props = new Properties();
36     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
37     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
38     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
39
40     return new KafkaProducer<>(props);
41   }
42
43   @Bean
44   KafkaConsumer<String, String> consumer(TransferServiceProperties properties)
45   {
46     Properties props = new Properties();
47     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
48     props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.groupId);
49     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
50     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
51
52     return new KafkaConsumer<>(props);
53   }
54
55   @Bean(destroyMethod = "shutdown")
56   ExecutorService executorService()
57   {
58     return Executors.newFixedThreadPool(1);
59   }
60
61   @Bean(destroyMethod = "shutdown")
62   TransferConsumer transferConsumer(
63       TransferServiceProperties properties,
64       KafkaConsumer<String, String> consumer,
65       ExecutorService executorService,
66       ObjectMapper mapper,
67       TransferService transferService)
68   {
69     TransferConsumer transferConsumer =
70         new TransferConsumer(properties.topic, consumer, executorService, mapper, transferService);
71     transferConsumer.start();
72     return transferConsumer;
73   }
74
75   @Bean
76   MessagingService kafkaMessagingService(
77       KafkaProducer<String, String> producer,
78       ObjectMapper mapper,
79       TransferServiceProperties properties)
80   {
81     return new KafkaMessagingService(producer, mapper, properties.topic);
82   }
83
84   @Bean
85   TransferService transferService(
86       TransferRepository repository,
87       MessagingService messagingService)
88   {
89     return new TransferService(repository, messagingService);
90   }
91
92
93   public static void main(String[] args)
94   {
95     SpringApplication.run(TransferServiceApplication.class, args);
96   }
97 }