WIP: instance-mapping from assignor assignor
authorKai Moritz <kai@juplo.de>
Mon, 15 Nov 2021 21:31:44 +0000 (22:31 +0100)
committerKai Moritz <kai@juplo.de>
Mon, 15 Nov 2021 21:31:44 +0000 (22:31 +0100)
src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java
src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java

index a5c1faf..d7d9f3f 100644 (file)
@@ -11,12 +11,9 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.AdminClientConfig;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
@@ -69,30 +66,9 @@ public class TransferServiceApplication
     return new KafkaProducer<>(props);
   }
 
-  @Bean
-  KafkaConsumer<String, String> consumer(TransferServiceProperties properties)
-  {
-    Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
-    Assert.hasText(properties.getGroupId(), "juplo.transfer.group-id must be set");
-    Assert.hasText(properties.getGroupInstanceId(), "juplo.transfer.group-instance-id must be set");
-
-    Properties props = new Properties();
-    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
-    props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
-    props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, properties.getGroupInstanceId());
-    props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getCanonicalName());
-    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-
-    return new KafkaConsumer<>(props);
-  }
-
   @Bean(destroyMethod = "shutdown")
   TransferConsumer transferConsumer(
       TransferServiceProperties properties,
-      KafkaConsumer<String, String> consumer,
       AdminClient adminClient,
       TransferRepository repository,
       LocalStateStoreSettings localStateStoreSettings,
@@ -100,12 +76,18 @@ public class TransferServiceApplication
       TransferService productionTransferService,
       TransferService restoreTransferService)
   {
+    Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
+    Assert.hasText(properties.getGroupId(), "juplo.transfer.group-id must be set");
+    Assert.hasText(properties.getGroupInstanceId(), "juplo.transfer.group-instance-id must be set");
+
     return
         new TransferConsumer(
+            properties.getBootstrapServers(),
+            properties.getGroupId(),
+            properties.getGroupInstanceId(),
             properties.getTopic(),
             properties.getNumPartitions(),
             properties.getInstanceIdUriMapping(),
-            consumer,
             adminClient,
             repository,
             Clock.systemDefaultZone(),
index 920d79a..c312951 100644 (file)
@@ -10,18 +10,18 @@ import de.juplo.kafka.payment.transfer.ports.TransferRepository;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.MemberDescription;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.springframework.context.event.ContextRefreshedEvent;
 import org.springframework.context.event.EventListener;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.ResponseBody;
 
+import java.nio.ByteBuffer;
 import java.time.Clock;
 import java.time.Duration;
 import java.time.Instant;
@@ -61,10 +61,12 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
 
 
   public TransferConsumer(
+      String bootstrapServers,
+      String groupId,
+      String groupInstanceId,
       String topic,
       int numPartitions,
       Map<String, String> instanceIdUriMapping,
-      KafkaConsumer<String, String> consumer,
       AdminClient adminClient,
       TransferRepository repository,
       Clock clock,
@@ -73,10 +75,22 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
       ConsumerUseCases productionUseCases,
       ConsumerUseCases restoreUseCases)
   {
+    Properties props = new Properties();
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+    props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
+    props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, new Assignor());
+    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+    this.consumer = new KafkaConsumer<>(props).;
+
+    this.groupId = groupId;
+    this.groupInstanceId = groupInstanceId;
     this.topic = topic;
     this.numPartitions = numPartitions;
-    this.groupId = consumer.groupMetadata().groupId();
-    this.groupInstanceId = consumer.groupMetadata().groupInstanceId().get();
     this.instanceIdByPartition = new String[numPartitions];
     this.instanceIdUriMapping = new HashMap<>(instanceIdUriMapping.size());
     for (String instanceId : instanceIdUriMapping.keySet())
@@ -87,7 +101,6 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
           : instanceIdUriMapping.get(instanceId);
       this.instanceIdUriMapping.put(instanceId, uri);
     }
-    this.consumer = consumer;
     this.adminClient = adminClient;
     this.repository = repository;
     this.clock = clock;
@@ -438,4 +451,29 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
         GetTransferUseCase,
         CreateTransferUseCase,
         HandleStateChangeUseCase {};
+
+  public class Assignor extends CooperativeStickyAssignor
+  {
+    @Override
+    public GroupAssignment assign(
+        Cluster metadata,
+        GroupSubscription groupSubscription)
+    {
+      return super.assign(metadata, groupSubscription);
+    }
+
+    @Override
+    public ByteBuffer subscriptionUserData(Set<String> topics)
+    {
+      return null;
+    }
+
+    @Override
+    public void onAssignment(
+        Assignment assignment,
+        ConsumerGroupMetadata metadata)
+    {
+      log.info("New assignment: {}, {}", assignment, metadata);
+    }
+  }
 }