1 package de.juplo.kafka.payment.transfer;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.adapter.*;
6 import de.juplo.kafka.payment.transfer.domain.Transfer;
7 import de.juplo.kafka.payment.transfer.persistence.InMemoryTransferRepository;
8 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
9 import de.juplo.kafka.payment.transfer.ports.TransferService;
10 import lombok.extern.slf4j.Slf4j;
11 import org.apache.kafka.clients.admin.AdminClient;
12 import org.apache.kafka.clients.admin.AdminClientConfig;
13 import org.apache.kafka.clients.consumer.ConsumerConfig;
14 import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
15 import org.apache.kafka.clients.consumer.KafkaConsumer;
16 import org.apache.kafka.clients.producer.KafkaProducer;
17 import org.apache.kafka.clients.producer.ProducerConfig;
18 import org.apache.kafka.common.serialization.StringDeserializer;
19 import org.apache.kafka.common.serialization.StringSerializer;
20 import org.springframework.boot.SpringApplication;
21 import org.springframework.boot.autoconfigure.SpringBootApplication;
22 import org.springframework.boot.context.properties.EnableConfigurationProperties;
23 import org.springframework.context.annotation.Bean;
24 import org.springframework.util.Assert;
26 import java.util.Optional;
27 import java.util.Properties;
30 @SpringBootApplication
31 @EnableConfigurationProperties(TransferServiceProperties.class)
33 public class TransferServiceApplication
35 @Bean(destroyMethod = "close")
36 AdminClient adminClient(TransferServiceProperties properties)
38 Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
40 Properties props = new Properties();
41 props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
43 return AdminClient.create(props);
46 @Bean(destroyMethod = "close")
47 KafkaProducer<String, String> producer(TransferServiceProperties properties)
49 Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
50 Assert.hasText(properties.getTopic(), "juplo.transfer.topic must be set");
51 Assert.notNull(properties.getNumPartitions(), "juplo.transfer.num-partitions must be set");
53 Properties props = new Properties();
54 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
55 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
56 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
57 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, TransferPartitioner.class);
58 props.put(TransferPartitioner.TOPIC, properties.getTopic());
59 props.put(TransferPartitioner.NUM_PARTITIONS, properties.getNumPartitions());
61 return new KafkaProducer<>(props);
65 KafkaConsumer<String, String> consumer(TransferServiceProperties properties)
67 Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
68 Assert.hasText(properties.getGroupId(), "juplo.transfer.group-id must be set");
69 Assert.hasText(properties.getGroupInstanceId(), "juplo.transfer.group-instance-id must be set");
71 Properties props = new Properties();
72 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
73 props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
74 props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, properties.getGroupInstanceId());
75 props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getCanonicalName());
76 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
77 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
78 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
79 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
81 return new KafkaConsumer<>(props);
84 @Bean(destroyMethod = "shutdown")
85 TransferConsumer transferConsumer(
86 TransferServiceProperties properties,
87 KafkaConsumer<String, String> consumer,
88 AdminClient adminClient,
89 TransferRepository repository,
91 TransferService productionTransferService,
92 TransferService restoreTransferService)
96 properties.getTopic(),
97 properties.getNumPartitions(),
98 properties.getInstanceIdUriMapping(),
103 new TransferConsumer.ConsumerUseCases() {
105 public void create(Long id, Long payer, Long payee, Integer amount)
107 productionTransferService.create(id, payer, payee, amount);
111 public Optional<Transfer> get(Long id)
113 return productionTransferService.get(id);
117 public void handleStateChange(Long id, Transfer.State state)
119 productionTransferService.handleStateChange(id, state);
122 new TransferConsumer.ConsumerUseCases() {
124 public void create(Long id, Long payer, Long payee, Integer amount)
126 restoreTransferService.create(id, payer, payee, amount);
130 public Optional<Transfer> get(Long id)
132 return restoreTransferService.get(id);
136 public void handleStateChange(Long id, Transfer.State state)
138 restoreTransferService.handleStateChange(id, state);
144 KafkaMessagingService kafkaMessagingService(
145 KafkaProducer<String, String> producer,
147 TransferServiceProperties properties)
149 return new KafkaMessagingService(producer, mapper, properties.getTopic());
153 InMemoryTransferRepository inMemoryTransferRepository(
154 TransferServiceProperties properties,
157 return new InMemoryTransferRepository(properties.getNumPartitions(), mapper);
161 TransferService productionTransferService(
162 TransferRepository repository,
163 KafkaMessagingService kafkaMessagingService)
165 return new TransferService(repository, kafkaMessagingService);
169 TransferService restoreTransferService(
170 TransferRepository repository,
171 NoOpMessageService noOpMessageService)
173 return new TransferService(repository, noOpMessageService);
177 TransferController transferController(
178 TransferService productionTransferService,
179 KafkaMessagingService kafkaMessagingService,
180 TransferConsumer transferConsumer)
182 return new TransferController(productionTransferService, kafkaMessagingService, transferConsumer);
186 public static void main(String[] args)
188 SpringApplication.run(TransferServiceApplication.class, args);