X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2FTransferServiceApplication.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2FTransferServiceApplication.java;h=02842e51c97a5cad302145df63f72480b3691a6d;hp=65f683cf35ba12ad7a295856a3a8d6f6c2e1b03f;hb=4467c5240397a47b181106a0ae902ed1b71d0c5d;hpb=540f0c5e8ef2c815d7ff37c7af2e119c448cbb1b diff --git a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java index 65f683c..02842e5 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java @@ -3,12 +3,16 @@ package de.juplo.kafka.payment.transfer; import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.payment.transfer.adapter.KafkaMessagingService; +import de.juplo.kafka.payment.transfer.adapter.TransferConsumer; import de.juplo.kafka.payment.transfer.domain.TransferService; import de.juplo.kafka.payment.transfer.ports.MessagingService; import de.juplo.kafka.payment.transfer.ports.TransferRepository; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -16,6 +20,8 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean; import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; @SpringBootApplication @@ -34,6 +40,38 @@ public class TransferServiceApplication return new KafkaProducer<>(props); } + @Bean + KafkaConsumer consumer(TransferServiceProperties properties) + { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers); + props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.groupId); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + return new KafkaConsumer<>(props); + } + + @Bean(destroyMethod = "shutdown") + ExecutorService executorService() + { + return Executors.newFixedThreadPool(1); + } + + @Bean(destroyMethod = "shutdown") + TransferConsumer transferConsumer( + TransferServiceProperties properties, + KafkaConsumer consumer, + ExecutorService executorService, + ObjectMapper mapper, + TransferService transferService) + { + TransferConsumer transferConsumer = + new TransferConsumer(properties.topic, consumer, executorService, mapper, transferService); + transferConsumer.start(); + return transferConsumer; + } + @Bean MessagingService kafkaMessagingService( KafkaProducer producer,