WIP: instance-mapping from assignor
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / TransferServiceApplication.java
index a5c1faf..d7d9f3f 100644 (file)
@@ -11,12 +11,9 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.AdminClientConfig;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
@@ -69,30 +66,9 @@ public class TransferServiceApplication
     return new KafkaProducer<>(props);
   }
 
-  @Bean
-  KafkaConsumer<String, String> consumer(TransferServiceProperties properties)
-  {
-    Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
-    Assert.hasText(properties.getGroupId(), "juplo.transfer.group-id must be set");
-    Assert.hasText(properties.getGroupInstanceId(), "juplo.transfer.group-instance-id must be set");
-
-    Properties props = new Properties();
-    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
-    props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
-    props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, properties.getGroupInstanceId());
-    props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getCanonicalName());
-    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-
-    return new KafkaConsumer<>(props);
-  }
-
   @Bean(destroyMethod = "shutdown")
   TransferConsumer transferConsumer(
       TransferServiceProperties properties,
-      KafkaConsumer<String, String> consumer,
       AdminClient adminClient,
       TransferRepository repository,
       LocalStateStoreSettings localStateStoreSettings,
@@ -100,12 +76,18 @@ public class TransferServiceApplication
       TransferService productionTransferService,
       TransferService restoreTransferService)
   {
+    Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
+    Assert.hasText(properties.getGroupId(), "juplo.transfer.group-id must be set");
+    Assert.hasText(properties.getGroupInstanceId(), "juplo.transfer.group-instance-id must be set");
+
     return
         new TransferConsumer(
+            properties.getBootstrapServers(),
+            properties.getGroupId(),
+            properties.getGroupInstanceId(),
             properties.getTopic(),
             properties.getNumPartitions(),
             properties.getInstanceIdUriMapping(),
-            consumer,
             adminClient,
             repository,
             Clock.systemDefaultZone(),