From ea9ba21a72d2d0117fccf4e98af97b1e6c135d9f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 15 Nov 2021 22:31:44 +0100 Subject: [PATCH] WIP: instance-mapping from assignor --- .../transfer/TransferServiceApplication.java | 32 +++-------- .../transfer/adapter/TransferConsumer.java | 54 ++++++++++++++++--- 2 files changed, 53 insertions(+), 33 deletions(-) diff --git a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java index a5c1faf..d7d9f3f 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java @@ -11,12 +11,9 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.CooperativeStickyAssignor; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -69,30 +66,9 @@ public class TransferServiceApplication return new KafkaProducer<>(props); } - @Bean - KafkaConsumer consumer(TransferServiceProperties properties) - { - Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set"); - Assert.hasText(properties.getGroupId(), "juplo.transfer.group-id must be set"); - Assert.hasText(properties.getGroupInstanceId(), "juplo.transfer.group-instance-id must be set"); - - Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers()); - props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId()); - props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, properties.getGroupInstanceId()); - props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getCanonicalName()); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - - return new KafkaConsumer<>(props); - } - @Bean(destroyMethod = "shutdown") TransferConsumer transferConsumer( TransferServiceProperties properties, - KafkaConsumer consumer, AdminClient adminClient, TransferRepository repository, LocalStateStoreSettings localStateStoreSettings, @@ -100,12 +76,18 @@ public class TransferServiceApplication TransferService productionTransferService, TransferService restoreTransferService) { + Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set"); + Assert.hasText(properties.getGroupId(), "juplo.transfer.group-id must be set"); + Assert.hasText(properties.getGroupInstanceId(), "juplo.transfer.group-instance-id must be set"); + return new TransferConsumer( + properties.getBootstrapServers(), + properties.getGroupId(), + properties.getGroupInstanceId(), properties.getTopic(), properties.getNumPartitions(), properties.getInstanceIdUriMapping(), - consumer, adminClient, repository, Clock.systemDefaultZone(), diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java index 920d79a..c312951 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java @@ -10,18 +10,18 @@ import de.juplo.kafka.payment.transfer.ports.TransferRepository; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.MemberDescription; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.Cluster; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.context.event.EventListener; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; +import java.nio.ByteBuffer; import java.time.Clock; import java.time.Duration; import java.time.Instant; @@ -61,10 +61,12 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener public TransferConsumer( + String bootstrapServers, + String groupId, + String groupInstanceId, String topic, int numPartitions, Map instanceIdUriMapping, - KafkaConsumer consumer, AdminClient adminClient, TransferRepository repository, Clock clock, @@ -73,10 +75,22 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener ConsumerUseCases productionUseCases, ConsumerUseCases restoreUseCases) { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId); + props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, new Assignor()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + this.consumer = new KafkaConsumer<>(props).; + + this.groupId = groupId; + this.groupInstanceId = groupInstanceId; this.topic = topic; this.numPartitions = numPartitions; - this.groupId = consumer.groupMetadata().groupId(); - this.groupInstanceId = consumer.groupMetadata().groupInstanceId().get(); this.instanceIdByPartition = new String[numPartitions]; this.instanceIdUriMapping = new HashMap<>(instanceIdUriMapping.size()); for (String instanceId : instanceIdUriMapping.keySet()) @@ -87,7 +101,6 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener : instanceIdUriMapping.get(instanceId); this.instanceIdUriMapping.put(instanceId, uri); } - this.consumer = consumer; this.adminClient = adminClient; this.repository = repository; this.clock = clock; @@ -438,4 +451,29 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener GetTransferUseCase, CreateTransferUseCase, HandleStateChangeUseCase {}; + + public class Assignor extends CooperativeStickyAssignor + { + @Override + public GroupAssignment assign( + Cluster metadata, + GroupSubscription groupSubscription) + { + return super.assign(metadata, groupSubscription); + } + + @Override + public ByteBuffer subscriptionUserData(Set topics) + { + return null; + } + + @Override + public void onAssignment( + Assignment assignment, + ConsumerGroupMetadata metadata) + { + log.info("New assignment: {}, {}", assignment, metadata); + } + } } -- 2.20.1