private volatile State state;
private volatile List<StateAtTime> history;
- private volatile List<Partition> assignedPartitions;
+ private volatile List<Partition> assignedPartitions = List.of();
public ConsumerHealthIndicator(Clock clock)
public void partitionsAssigned(Collection<TopicPartition> partitions)
{
this.state = State.RUNNING;
- List<Partition> assignedPartitions = new LinkedList<>();
- assignedPartitions.addAll(assignedPartitions);
+ List<Partition> assignedPartitions = new LinkedList<>(this.assignedPartitions);
partitions.forEach(tp -> assignedPartitions.add(new Partition(tp.topic(), tp.partition())));
Collections.sort(assignedPartitions, partitionComparator);
this.assignedPartitions = assignedPartitions;
- List<StateAtTime> history = new LinkedList<>();
- history.add(new StateAtTime(ZonedDateTime.now(clock), state, assignedPartitions));
- history.addAll(this.history);
- if(history.size() > 10)
- {
- history.removeLast();
- }
- this.history = history;
+ addCurrentStateToHistory();
}
public void partitionsRevoked(Collection<TopicPartition> partitions)
{
this.state = State.REBALANCING;
- List<Partition> assignedPartitions = new LinkedList<>();
- assignedPartitions.addAll(assignedPartitions);
+ List<Partition> assignedPartitions = new LinkedList<>(this.assignedPartitions);
partitions.forEach(tp -> assignedPartitions.remove(new Partition(tp.topic(), tp.partition())));
this.assignedPartitions = assignedPartitions;
- List<StateAtTime> history = new LinkedList<>();
- history.add(new StateAtTime(ZonedDateTime.now(clock), state, assignedPartitions));
- history.addAll(this.history);
- if(history.size() > 10)
- {
- history.removeLast();
- }
- this.history = history;
+ addCurrentStateToHistory();
}
public void partitionsLost(Collection<TopicPartition> partitions)
{
this.state = State.FENCED;
this.assignedPartitions = List.of();
- List<StateAtTime> history = new LinkedList<>();
- history.add(new StateAtTime(ZonedDateTime.now(clock), state, assignedPartitions));
- history.addAll(this.history);
- if(history.size() > 10)
- {
- history.removeLast();
- }
- this.history = history;
+ addCurrentStateToHistory();
}
: new Health.Builder().status(state.name());
}
+ private void addCurrentStateToHistory()
+ {
+ List<StateAtTime> history = new LinkedList<>();
+ history.add(new StateAtTime(ZonedDateTime.now(clock), state, assignedPartitions));
+ history.addAll(this.history);
+ if(history.size() > 10)
+ {
+ history.removeLast();
+ }
+ this.history = history;
+ }
+
enum State { STARTING, FENCED, REBALANCING, RUNNING }
public record Partition(String topic, Integer partition) {}