X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2FTransferServiceApplication.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2FTransferServiceApplication.java;h=a82f8b109abeaec2ae9106f4e66f311dcb1347ff;hp=53ff0f4b6673261e7a4b6050f740aa127a483e2a;hb=43ea59755f9673864a3ef95250009f091e99a760;hpb=cbfe4b796266ff7b9689fb69c5a8efee8ebb130a diff --git a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java index 53ff0f4..a82f8b1 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java @@ -2,15 +2,16 @@ package de.juplo.kafka.payment.transfer; import com.fasterxml.jackson.databind.ObjectMapper; -import de.juplo.kafka.payment.transfer.adapter.KafkaMessagingService; -import de.juplo.kafka.payment.transfer.adapter.NoOpMessageService; -import de.juplo.kafka.payment.transfer.adapter.TransferConsumer; -import de.juplo.kafka.payment.transfer.adapter.TransferController; +import de.juplo.kafka.payment.transfer.adapter.*; import de.juplo.kafka.payment.transfer.domain.Transfer; +import de.juplo.kafka.payment.transfer.persistence.InMemoryTransferRepository; import de.juplo.kafka.payment.transfer.ports.TransferRepository; import de.juplo.kafka.payment.transfer.ports.TransferService; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.CooperativeStickyAssignor; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -31,16 +32,31 @@ import java.util.Properties; @Slf4j public class TransferServiceApplication { + @Bean(destroyMethod = "close") + AdminClient adminClient(TransferServiceProperties properties) + { + Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set"); + + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers()); + + return AdminClient.create(props); + } + @Bean(destroyMethod = "close") KafkaProducer producer(TransferServiceProperties properties) { Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set"); Assert.hasText(properties.getTopic(), "juplo.transfer.topic must be set"); + Assert.notNull(properties.getNumPartitions(), "juplo.transfer.num-partitions must be set"); Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, TransferPartitioner.class); + props.put(TransferPartitioner.TOPIC, properties.getTopic()); + props.put(TransferPartitioner.NUM_PARTITIONS, properties.getNumPartitions()); return new KafkaProducer<>(props); } @@ -50,10 +66,13 @@ public class TransferServiceApplication { Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set"); Assert.hasText(properties.getGroupId(), "juplo.transfer.group-id must be set"); + Assert.hasText(properties.getGroupInstanceId(), "juplo.transfer.group-instance-id must be set"); Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers()); props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId()); + props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, properties.getGroupInstanceId()); + props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getCanonicalName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); @@ -66,6 +85,8 @@ public class TransferServiceApplication TransferConsumer transferConsumer( TransferServiceProperties properties, KafkaConsumer consumer, + AdminClient adminClient, + TransferRepository repository, ObjectMapper mapper, TransferService productionTransferService, TransferService restoreTransferService) @@ -73,7 +94,11 @@ public class TransferServiceApplication return new TransferConsumer( properties.getTopic(), + properties.getNumPartitions(), + properties.getInstanceIdUriMapping(), consumer, + adminClient, + repository, mapper, new TransferConsumer.ConsumerUseCases() { @Override @@ -124,6 +149,14 @@ public class TransferServiceApplication return new KafkaMessagingService(producer, mapper, properties.getTopic()); } + @Bean + InMemoryTransferRepository inMemoryTransferRepository( + TransferServiceProperties properties, + ObjectMapper mapper) + { + return new InMemoryTransferRepository(properties.getNumPartitions(), mapper); + } + @Bean TransferService productionTransferService( TransferRepository repository, @@ -143,9 +176,10 @@ public class TransferServiceApplication @Bean TransferController transferController( TransferService productionTransferService, - KafkaMessagingService kafkaMessagingService) + KafkaMessagingService kafkaMessagingService, + TransferConsumer transferConsumer) { - return new TransferController(productionTransferService, kafkaMessagingService); + return new TransferController(productionTransferService, kafkaMessagingService, transferConsumer); }