private final Consumer<String, String> consumer;
private final Thread workerThread;
- private final CounterState counterState = new CounterState();
private final String stateTopic;
Producer<String, String> producer;
private final Phaser phaser = new Phaser(1);
private final Set<TopicPartition> assignedPartitions = new HashSet<>();
private volatile PartitionState[] partitionStates;
+ private CounterState[] counterState;
private volatile long[] stateEndOffsets;
private volatile int[] seen;
private volatile int[] acked;
{
partitionStates[i] = PartitionState.UNASSIGNED;
}
+ counterState = new CounterState[numPartitions];
stateEndOffsets = new long[numPartitions];
seen = new int[numPartitions];
acked = new int[numPartitions];
String key,
String value)
{
- counterState.setCounterState(key, Long.parseLong(value));
+ counterState[partition].setCounterState(key, Long.parseLong(value));
if (offset + 1 == stateEndOffsets[partition])
{
log.info("{} - Restoring of state for partition {} done!", id, partition);
Integer partition,
String key)
{
- Long counter = computeCount(key);
+ Long counter = computeCount(partition, key);
log.info("{} - current value for counter {}: {}", id, key, counter);
sendCounterState(partition, key, counter);
}
- private synchronized Long computeCount(String key)
+ private synchronized Long computeCount(int partition, String key)
{
- return counterState.addToCounter(key);
+ return counterState[partition].addToCounter(key);
}
- public Map<String, Long> getCounterState()
+ public Map<Integer, Map<String, Long>> getCounterState()
{
- return counterState.getCounterState();
+ Map<Integer, Map<String, Long>> result = new HashMap<>(assignedPartitions.size());
+ assignedPartitions.forEach(tp -> result.put(tp.partition(), counterState[tp.partition()].getCounterState()));
+ return result;
}
void sendCounterState(int partition, String key, Long counter)
private void restoreAndAssign(int partition)
{
+ counterState[partition] = new CounterState();
+
TopicPartition statePartition = new TopicPartition(this.stateTopic, partition);
long stateEndOffset = consumer
TopicPartition messagePartition = new TopicPartition(topic, partition);
log.info("{} - Revoking partition {}", id, messagePartition);
assignedPartitions.remove(messagePartition);
+ counterState[partition] = null;
phaser.arriveAndDeregister();
log.info(