+ private final String groupId;
+ private final String groupInstanceId;
+ private final Map<String, String> instanceIdUriMapping;
+ private final String[] instanceIdByPartition;
+
+ private Clock clock;
+ private int stateStoreInterval;
+
+ private volatile boolean partitionOwnershipUnknown = true;
+
+
+ public TransferConsumer(
+ String topic,
+ int numPartitions,
+ Map<String, String> instanceIdUriMapping,
+ KafkaConsumer<String, String> consumer,
+ AdminClient adminClient,
+ TransferRepository repository,
+ Clock clock,
+ int stateStoreInterval,
+ ObjectMapper mapper,
+ ConsumerUseCases productionUseCases,
+ ConsumerUseCases restoreUseCases)
+ {
+ this.topic = topic;
+ this.numPartitions = numPartitions;
+ this.groupId = consumer.groupMetadata().groupId();
+ this.groupInstanceId = consumer.groupMetadata().groupInstanceId().get();
+ this.instanceIdByPartition = new String[numPartitions];
+ this.instanceIdUriMapping = new HashMap<>(instanceIdUriMapping.size());
+ for (String instanceId : instanceIdUriMapping.keySet())
+ {
+ // Requests are not redirected for the instance itself
+ String uri = instanceId.equals(groupInstanceId)
+ ? null
+ : instanceIdUriMapping.get(instanceId);
+ this.instanceIdUriMapping.put(instanceId, uri);
+ }
+ this.consumer = consumer;
+ this.adminClient = adminClient;
+ this.repository = repository;
+ this.clock = clock;
+ this.stateStoreInterval = stateStoreInterval;
+ this.mapper = mapper;
+ this.productionUseCases = productionUseCases;
+ this.restoreUseCases = restoreUseCases;
+ }
+