public void partitionsAssigned(Collection<TopicPartition> partitions)
{
- this.state = State.RUNNING;
- List<Partition> assignedPartitions = new LinkedList<>(this.assignedPartitions);
- partitions.forEach(tp -> assignedPartitions.add(new Partition(tp.topic(), tp.partition())));
- Collections.sort(assignedPartitions, partitionComparator);
- this.assignedPartitions = assignedPartitions;
- addCurrentStateToHistory();
+ List<Partition> newAssignedPartitions = new LinkedList<>(this.assignedPartitions);
+ partitions.forEach(tp -> newAssignedPartitions.add(new Partition(tp.topic(), tp.partition())));
+ Collections.sort(newAssignedPartitions, partitionComparator);
+
+ updateAndRecordState(State.RUNNING, newAssignedPartitions);
}
public void partitionsRevoked(Collection<TopicPartition> partitions)
{
- this.state = State.REBALANCING;
- List<Partition> assignedPartitions = new LinkedList<>(this.assignedPartitions);
- partitions.forEach(tp -> assignedPartitions.remove(new Partition(tp.topic(), tp.partition())));
- this.assignedPartitions = assignedPartitions;
- addCurrentStateToHistory();
+ List<Partition> newAssignedPartitions = new LinkedList<>(this.assignedPartitions);
+ partitions.forEach(tp -> newAssignedPartitions.remove(new Partition(tp.topic(), tp.partition())));
+
+ updateAndRecordState(State.REBALANCING, newAssignedPartitions);
}
public void partitionsLost(Collection<TopicPartition> partitions)
{
- this.state = State.FENCED;
- this.assignedPartitions = List.of();
- addCurrentStateToHistory();
+ updateAndRecordState(State.FENCED, List.of());
}
: new Health.Builder().status(state.name());
}
- private void addCurrentStateToHistory()
+ private void updateAndRecordState(
+ State newState,
+ List<Partition> newAssignedPartitions)
{
- List<StateAtTime> history = new LinkedList<>();
- history.add(new StateAtTime(ZonedDateTime.now(clock), state, assignedPartitions));
- history.addAll(this.history);
- if(history.size() > 10)
+ List<StateAtTime> newHistory = new LinkedList<>();
+ newHistory.add(new StateAtTime(ZonedDateTime.now(clock), newState, newAssignedPartitions));
+ newHistory.addAll(this.history);
+ if(newHistory.size() > 10)
{
- history.removeLast();
+ newHistory.removeLast();
}
- this.history = history;
+ this.state = newState;
+ this.assignedPartitions = newAssignedPartitions;
+ this.history = newHistory;
}
enum State { STARTING, FENCED, REBALANCING, RUNNING }