1 package de.juplo.kafka.payment.transfer;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.adapter.KafkaMessagingService;
6 import de.juplo.kafka.payment.transfer.adapter.TransferConsumer;
7 import de.juplo.kafka.payment.transfer.domain.TransferService;
8 import de.juplo.kafka.payment.transfer.ports.MessagingService;
9 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
10 import lombok.extern.slf4j.Slf4j;
11 import org.apache.kafka.clients.consumer.ConsumerConfig;
12 import org.apache.kafka.clients.consumer.KafkaConsumer;
13 import org.apache.kafka.clients.producer.KafkaProducer;
14 import org.apache.kafka.clients.producer.ProducerConfig;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16 import org.apache.kafka.common.serialization.StringSerializer;
17 import org.springframework.boot.SpringApplication;
18 import org.springframework.boot.autoconfigure.SpringBootApplication;
19 import org.springframework.boot.context.properties.EnableConfigurationProperties;
20 import org.springframework.context.annotation.Bean;
22 import java.util.Properties;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Executors;
27 @SpringBootApplication
28 @EnableConfigurationProperties(TransferServiceProperties.class)
30 public class TransferServiceApplication
32 @Bean(destroyMethod = "close")
33 KafkaProducer<String, String> producer(TransferServiceProperties properties)
35 Properties props = new Properties();
36 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
37 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
38 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
40 return new KafkaProducer<>(props);
44 KafkaConsumer<String, String> consumer(TransferServiceProperties properties)
46 Properties props = new Properties();
47 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
48 props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.groupId);
49 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
50 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
52 return new KafkaConsumer<>(props);
55 @Bean(destroyMethod = "shutdown")
56 ExecutorService executorService()
58 return Executors.newFixedThreadPool(1);
61 @Bean(destroyMethod = "shutdown")
62 TransferConsumer transferConsumer(
63 TransferServiceProperties properties,
64 KafkaConsumer<String, String> consumer,
65 ExecutorService executorService,
67 TransferService transferService)
69 TransferConsumer transferConsumer =
78 transferConsumer.start();
79 return transferConsumer;
83 MessagingService kafkaMessagingService(
84 KafkaProducer<String, String> producer,
86 TransferServiceProperties properties)
88 return new KafkaMessagingService(producer, mapper, properties.topic);
92 TransferService transferService(
93 TransferRepository repository,
94 MessagingService messagingService)
96 return new TransferService(repository, messagingService);
100 public static void main(String[] args)
102 SpringApplication.run(TransferServiceApplication.class, args);