import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
return new KafkaProducer<>(props);
}
- @Bean
- KafkaConsumer<String, String> consumer(TransferServiceProperties properties)
- {
- Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
- Assert.hasText(properties.getGroupId(), "juplo.transfer.group-id must be set");
- Assert.hasText(properties.getGroupInstanceId(), "juplo.transfer.group-instance-id must be set");
-
- Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
- props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
- props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, properties.getGroupInstanceId());
- props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getCanonicalName());
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-
- return new KafkaConsumer<>(props);
- }
-
@Bean(destroyMethod = "shutdown")
TransferConsumer transferConsumer(
TransferServiceProperties properties,
- KafkaConsumer<String, String> consumer,
AdminClient adminClient,
TransferRepository repository,
LocalStateStoreSettings localStateStoreSettings,
TransferService productionTransferService,
TransferService restoreTransferService)
{
+ Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
+ Assert.hasText(properties.getGroupId(), "juplo.transfer.group-id must be set");
+ Assert.hasText(properties.getGroupInstanceId(), "juplo.transfer.group-instance-id must be set");
+
return
new TransferConsumer(
+ properties.getBootstrapServers(),
+ properties.getGroupId(),
+ properties.getGroupInstanceId(),
properties.getTopic(),
properties.getNumPartitions(),
properties.getInstanceIdUriMapping(),
- consumer,
adminClient,
repository,
Clock.systemDefaultZone(),