{
private final Clock clock;
- private volatile State state;
- private volatile List<StateAtTime> history;
+ private volatile RebalancingState rebalancingState;
+ private volatile List<RecordedState> history;
private volatile List<Partition> assignedPartitions = List.of();
{
this.clock = clock;
- state = State.STARTING;
- StateAtTime sat = new StateAtTime(ZonedDateTime.now(clock), state, List.of());
+ rebalancingState = RebalancingState.STARTING;
+ RecordedState sat = new RecordedState(ZonedDateTime.now(clock), rebalancingState, List.of());
history = List.of(sat);
}
if (includeDetails)
{
- healthBuilder.withDetail("status", state);
+ healthBuilder.withDetail("rebalancing_state", rebalancingState);
healthBuilder.withDetail("history", history);
}
partitions.forEach(tp -> newAssignedPartitions.add(new Partition(tp.topic(), tp.partition())));
Collections.sort(newAssignedPartitions, partitionComparator);
- updateAndRecordState(State.RUNNING, newAssignedPartitions);
+ updateAndRecordState(RebalancingState.RUNNING, newAssignedPartitions);
}
public void partitionsRevoked(Collection<TopicPartition> partitions)
List<Partition> newAssignedPartitions = new LinkedList<>(this.assignedPartitions);
partitions.forEach(tp -> newAssignedPartitions.remove(new Partition(tp.topic(), tp.partition())));
- updateAndRecordState(State.REBALANCING, newAssignedPartitions);
+ updateAndRecordState(RebalancingState.REBALANCING, newAssignedPartitions);
}
public void partitionsLost(Collection<TopicPartition> partitions)
{
- updateAndRecordState(State.FENCED, List.of());
+ updateAndRecordState(RebalancingState.FENCED, List.of());
}
private Health.Builder getHealthBuilder()
{
- return state == State.RUNNING
+ return rebalancingState == RebalancingState.RUNNING
? new Health.Builder().up()
- : new Health.Builder().status(state.name());
+ : new Health.Builder().status(rebalancingState.name());
}
private void updateAndRecordState(
- State newState,
+ RebalancingState newRebalancingState,
List<Partition> newAssignedPartitions)
{
- List<StateAtTime> newHistory = new LinkedList<>();
- newHistory.add(new StateAtTime(ZonedDateTime.now(clock), newState, newAssignedPartitions));
+ List<RecordedState> newHistory = new LinkedList<>();
+ newHistory.add(new RecordedState(
+ ZonedDateTime.now(clock),
+ newRebalancingState,
+ newAssignedPartitions));
newHistory.addAll(this.history);
if(newHistory.size() > 10)
{
newHistory.removeLast();
}
- this.state = newState;
+ this.rebalancingState = newRebalancingState;
this.assignedPartitions = newAssignedPartitions;
this.history = newHistory;
}
- enum State { STARTING, FENCED, REBALANCING, RUNNING }
+ enum RebalancingState { STARTING, FENCED, REBALANCING, RUNNING }
public record Partition(String topic, Integer partition) {}
- public record StateAtTime(ZonedDateTime time, State state, List<Partition> assignedPartitions) {}
+ public record RecordedState(
+ ZonedDateTime time,
+ RebalancingState rebalancingState,
+ List<Partition> assignedPartitions) {}
private final static Comparator<Partition> partitionComparator = (tp1, tp2) ->