import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
return new KafkaProducer<>(props);
}
- @Bean
- KafkaConsumer<String, String> consumer(TransferServiceProperties properties)
- {
- Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
- Assert.hasText(properties.getGroupId(), "juplo.transfer.group-id must be set");
- Assert.hasText(properties.getGroupInstanceId(), "juplo.transfer.group-instance-id must be set");
-
- Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
- props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
- props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, properties.getGroupInstanceId());
- props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getCanonicalName());
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-
- return new KafkaConsumer<>(props);
- }
-
@Bean(destroyMethod = "shutdown")
TransferConsumer transferConsumer(
TransferServiceProperties properties,
- KafkaConsumer<String, String> consumer,
AdminClient adminClient,
TransferRepository repository,
LocalStateStoreSettings localStateStoreSettings,
TransferService productionTransferService,
TransferService restoreTransferService)
{
+ Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
+ Assert.hasText(properties.getGroupId(), "juplo.transfer.group-id must be set");
+ Assert.hasText(properties.getGroupInstanceId(), "juplo.transfer.group-instance-id must be set");
+
return
new TransferConsumer(
+ properties.getBootstrapServers(),
+ properties.getGroupId(),
+ properties.getGroupInstanceId(),
properties.getTopic(),
properties.getNumPartitions(),
properties.getInstanceIdUriMapping(),
- consumer,
adminClient,
repository,
Clock.systemDefaultZone(),
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.MemberDescription;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
+import java.nio.ByteBuffer;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
public TransferConsumer(
+ String bootstrapServers,
+ String groupId,
+ String groupInstanceId,
String topic,
int numPartitions,
Map<String, String> instanceIdUriMapping,
- KafkaConsumer<String, String> consumer,
AdminClient adminClient,
TransferRepository repository,
Clock clock,
ConsumerUseCases productionUseCases,
ConsumerUseCases restoreUseCases)
{
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
+ props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, new Assignor());
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+ this.consumer = new KafkaConsumer<>(props).;
+
+ this.groupId = groupId;
+ this.groupInstanceId = groupInstanceId;
this.topic = topic;
this.numPartitions = numPartitions;
- this.groupId = consumer.groupMetadata().groupId();
- this.groupInstanceId = consumer.groupMetadata().groupInstanceId().get();
this.instanceIdByPartition = new String[numPartitions];
this.instanceIdUriMapping = new HashMap<>(instanceIdUriMapping.size());
for (String instanceId : instanceIdUriMapping.keySet())
: instanceIdUriMapping.get(instanceId);
this.instanceIdUriMapping.put(instanceId, uri);
}
- this.consumer = consumer;
this.adminClient = adminClient;
this.repository = repository;
this.clock = clock;
GetTransferUseCase,
CreateTransferUseCase,
HandleStateChangeUseCase {};
+
+ public class Assignor extends CooperativeStickyAssignor
+ {
+ @Override
+ public GroupAssignment assign(
+ Cluster metadata,
+ GroupSubscription groupSubscription)
+ {
+ return super.assign(metadata, groupSubscription);
+ }
+
+ @Override
+ public ByteBuffer subscriptionUserData(Set<String> topics)
+ {
+ return null;
+ }
+
+ @Override
+ public void onAssignment(
+ Assignment assignment,
+ ConsumerGroupMetadata metadata)
+ {
+ log.info("New assignment: {}, {}", assignment, metadata);
+ }
+ }
}