private volatile boolean running = false;
private final Phaser phaser = new Phaser(1);
private final Set<TopicPartition> assignedPartitions = new HashSet<>();
- private volatile PartitionState[] partitionStates;
+ private volatile State[] partitionStates;
private Map<String, Long>[] restoredState;
private CounterState[] counterState;
private volatile long[] stateEndOffsets;
log.info("{} - Fetching PartitionInfo for topic {}", id, topic);
int numPartitions = consumer.partitionsFor(topic).size();
log.info("{} - Topic {} has {} partitions", id, topic, numPartitions);
- partitionStates = new PartitionState[numPartitions];
+ partitionStates = new State[numPartitions];
for (int i=0; i<numPartitions; i++)
{
- partitionStates[i] = PartitionState.UNASSIGNED;
+ partitionStates[i] = State.UNASSIGNED;
}
restoredState = new Map[numPartitions];
counterState = new CounterState[numPartitions];
private void revoke(int partition)
{
- PartitionState partitionState = partitionStates[partition];
+ State partitionState = partitionStates[partition];
switch (partitionState)
{
case RESTORING, ASSIGNED -> stateUnassigned(partition);
id,
partition,
partitionStates[partition]);
- partitionStates[partition] = PartitionState.RESTORING;
+ partitionStates[partition] = State.RESTORING;
TopicPartition messagePartition = new TopicPartition(this.topic, partition);
log.info("{} - Pausing message partition {}", id, messagePartition);
partition,
partitionStates[partition]);
- partitionStates[partition] = PartitionState.ASSIGNED;
+ partitionStates[partition] = State.ASSIGNED;
TopicPartition statePartition = new TopicPartition(stateTopic, partition);
log.info("{} - Pausing state partition {}...", id, statePartition);
private void stateUnassigned(int partition)
{
- PartitionState oldPartitionState = partitionStates[partition];
+ State oldPartitionState = partitionStates[partition];
log.info(
"{} - State-change for partition {}: {} -> UNASSIGNED",
partition,
oldPartitionState);
- partitionStates[partition] = PartitionState.UNASSIGNED;
+ partitionStates[partition] = State.UNASSIGNED;
- if (oldPartitionState == PartitionState.ASSIGNED)
+ if (oldPartitionState == State.ASSIGNED)
{
TopicPartition messagePartition = new TopicPartition(topic, partition);
log.info("{} - Revoking partition {}", id, messagePartition);
workerThread.join();
}
- enum PartitionState
+ enum State
{
UNASSIGNED,
RESTORING,