WIP: instance-mapping from assignor
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / adapter / TransferConsumer.java
index 920d79a..c312951 100644 (file)
@@ -10,18 +10,18 @@ import de.juplo.kafka.payment.transfer.ports.TransferRepository;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.MemberDescription;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.springframework.context.event.ContextRefreshedEvent;
 import org.springframework.context.event.EventListener;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.ResponseBody;
 
+import java.nio.ByteBuffer;
 import java.time.Clock;
 import java.time.Duration;
 import java.time.Instant;
@@ -61,10 +61,12 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
 
 
   public TransferConsumer(
+      String bootstrapServers,
+      String groupId,
+      String groupInstanceId,
       String topic,
       int numPartitions,
       Map<String, String> instanceIdUriMapping,
-      KafkaConsumer<String, String> consumer,
       AdminClient adminClient,
       TransferRepository repository,
       Clock clock,
@@ -73,10 +75,22 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
       ConsumerUseCases productionUseCases,
       ConsumerUseCases restoreUseCases)
   {
+    Properties props = new Properties();
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+    props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
+    props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, new Assignor());
+    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+    this.consumer = new KafkaConsumer<>(props).;
+
+    this.groupId = groupId;
+    this.groupInstanceId = groupInstanceId;
     this.topic = topic;
     this.numPartitions = numPartitions;
-    this.groupId = consumer.groupMetadata().groupId();
-    this.groupInstanceId = consumer.groupMetadata().groupInstanceId().get();
     this.instanceIdByPartition = new String[numPartitions];
     this.instanceIdUriMapping = new HashMap<>(instanceIdUriMapping.size());
     for (String instanceId : instanceIdUriMapping.keySet())
@@ -87,7 +101,6 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
           : instanceIdUriMapping.get(instanceId);
       this.instanceIdUriMapping.put(instanceId, uri);
     }
-    this.consumer = consumer;
     this.adminClient = adminClient;
     this.repository = repository;
     this.clock = clock;
@@ -438,4 +451,29 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
         GetTransferUseCase,
         CreateTransferUseCase,
         HandleStateChangeUseCase {};
+
+  public class Assignor extends CooperativeStickyAssignor
+  {
+    @Override
+    public GroupAssignment assign(
+        Cluster metadata,
+        GroupSubscription groupSubscription)
+    {
+      return super.assign(metadata, groupSubscription);
+    }
+
+    @Override
+    public ByteBuffer subscriptionUserData(Set<String> topics)
+    {
+      return null;
+    }
+
+    @Override
+    public void onAssignment(
+        Assignment assignment,
+        ConsumerGroupMetadata metadata)
+    {
+      log.info("New assignment: {}, {}", assignment, metadata);
+    }
+  }
 }