1 package de.juplo.kafka.payment.transfer;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.adapter.KafkaMessagingService;
6 import de.juplo.kafka.payment.transfer.adapter.NoOpMessageService;
7 import de.juplo.kafka.payment.transfer.adapter.TransferConsumer;
8 import de.juplo.kafka.payment.transfer.adapter.TransferController;
9 import de.juplo.kafka.payment.transfer.domain.Transfer;
10 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
11 import de.juplo.kafka.payment.transfer.ports.TransferService;
12 import lombok.extern.slf4j.Slf4j;
13 import org.apache.kafka.clients.consumer.ConsumerConfig;
14 import org.apache.kafka.clients.consumer.KafkaConsumer;
15 import org.apache.kafka.clients.producer.KafkaProducer;
16 import org.apache.kafka.clients.producer.ProducerConfig;
17 import org.apache.kafka.common.serialization.StringDeserializer;
18 import org.apache.kafka.common.serialization.StringSerializer;
19 import org.springframework.boot.SpringApplication;
20 import org.springframework.boot.autoconfigure.SpringBootApplication;
21 import org.springframework.boot.context.properties.EnableConfigurationProperties;
22 import org.springframework.context.annotation.Bean;
23 import org.springframework.util.Assert;
25 import java.util.Optional;
26 import java.util.Properties;
29 @SpringBootApplication
30 @EnableConfigurationProperties(TransferServiceProperties.class)
32 public class TransferServiceApplication
34 @Bean(destroyMethod = "close")
35 KafkaProducer<String, String> producer(TransferServiceProperties properties)
37 Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
38 Assert.hasText(properties.getTopic(), "juplo.transfer.topic must be set");
40 Properties props = new Properties();
41 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
42 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
43 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
45 return new KafkaProducer<>(props);
49 KafkaConsumer<String, String> consumer(TransferServiceProperties properties)
51 Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
52 Assert.hasText(properties.getGroupId(), "juplo.transfer.group-id must be set");
54 Properties props = new Properties();
55 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
56 props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
57 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
58 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
59 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
60 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
62 return new KafkaConsumer<>(props);
65 @Bean(destroyMethod = "shutdown")
66 TransferConsumer transferConsumer(
67 TransferServiceProperties properties,
68 KafkaConsumer<String, String> consumer,
70 TransferService productionTransferService,
71 TransferService restoreTransferService)
75 properties.getTopic(),
78 new TransferConsumer.ConsumerUseCases() {
80 public void create(Long id, Long payer, Long payee, Integer amount)
82 productionTransferService.create(id, payer, payee, amount);
86 public Optional<Transfer> get(Long id)
88 return productionTransferService.get(id);
92 public void handleStateChange(Long id, Transfer.State state)
94 productionTransferService.handleStateChange(id, state);
97 new TransferConsumer.ConsumerUseCases() {
99 public void create(Long id, Long payer, Long payee, Integer amount)
101 restoreTransferService.create(id, payer, payee, amount);
105 public Optional<Transfer> get(Long id)
107 return restoreTransferService.get(id);
111 public void handleStateChange(Long id, Transfer.State state)
113 restoreTransferService.handleStateChange(id, state);
119 KafkaMessagingService kafkaMessagingService(
120 KafkaProducer<String, String> producer,
122 TransferServiceProperties properties)
124 return new KafkaMessagingService(producer, mapper, properties.getTopic());
128 TransferService productionTransferService(
129 TransferRepository repository,
130 KafkaMessagingService kafkaMessagingService)
132 return new TransferService(repository, kafkaMessagingService);
136 TransferService restoreTransferService(
137 TransferRepository repository,
138 NoOpMessageService noOpMessageService)
140 return new TransferService(repository, noOpMessageService);
144 TransferController transferController(
145 TransferService productionTransferService,
146 KafkaMessagingService kafkaMessagingService)
148 return new TransferController(productionTransferService, kafkaMessagingService);
152 public static void main(String[] args)
154 SpringApplication.run(TransferServiceApplication.class, args);