private volatile boolean running = false;
private final Phaser phaser = new Phaser(1);
private final Set<TopicPartition> assignedPartitions = new HashSet<>();
+ private volatile PartitionState[] partitionStates;
+ private volatile long[] stateEndOffsets;
private volatile int[] seen;
private volatile int[] acked;
private volatile boolean[] done;
log.info("{} - Fetching PartitionInfo for topic {}", id, topic);
int numPartitions = consumer.partitionsFor(topic).size();
log.info("{} - Topic {} has {} partitions", id, topic, numPartitions);
+ partitionStates = new PartitionState[numPartitions];
+ for (int i=0; i<numPartitions; i++)
+ {
+ partitionStates[i] = PartitionState.UNASSIGNED;
+ }
+ stateEndOffsets = new long[numPartitions];
seen = new int[numPartitions];
acked = new int[numPartitions];
done = new boolean[numPartitions];
log.info("{} - Subscribing to topic {}", id, topic);
- consumer.subscribe(Arrays.asList(topic), this);
+ consumer.subscribe(Arrays.asList(topic, stateTopic), this);
running = true;
while (running)
int phase = phaser.getPhase();
- log.info("{} - Received {} messages in phase {}", id, records.count(), phase);
assignedPartitions
.forEach(partition ->
{
seen[partition.partition()] = 0;
acked[partition.partition()] = 0;
done[partition.partition()] = false;
+ });
+ log.info("{} - Received {} messages in phase {}", id, records.count(), phase);
+ records
+ .partitions()
+ .forEach(partition ->
+ {
for (ConsumerRecord<String, String> record : records.records(partition))
{
handleRecord(
}
done[partition.partition()] = true;
+ });
+
+ assignedPartitions
+ .forEach(partition ->
+ {
if (seen[partition.partition()] == 0)
{
int arrivedPhase = phaser.arrive();
{
consumed++;
log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value);
+
+ if (topic.equals(this.topic))
+ {
+ handleMessage(partition, key);
+ }
+ else
+ {
+ handleState(partition, offset, key, value);
+ }
+ }
+
+ private synchronized void handleState(
+ int partition,
+ long offset,
+ String key,
+ String value)
+ {
+ counterState.put(key, Long.parseLong(value));
+ if (offset + 1 == stateEndOffsets[partition])
+ {
+ log.info(
+ "{} - Restoring of state for partition {} done! New partition-state is ASSIGNED!", id, partition);
+ partitionStates[partition] = PartitionState.ASSIGNED;
+
+ TopicPartition tp;
+
+ tp = new TopicPartition(stateTopic, partition);
+ log.info("{} - Pausing state partition {}...", id, tp);
+ consumer.pause(List.of(tp));
+
+ tp = new TopicPartition(topic, partition);
+ log.info("{} - Resuming message partition {}...", id, tp);
+ consumer.resume(List.of(tp));
+ }
+ else
+ {
+ log.debug(
+ "{} - Restored state up to offset {}, end-offset: {}, state: {}={}",
+ id,
+ offset,
+ stateEndOffsets[partition],
+ key,
+ value);
+ }
+ }
+
+ private void handleMessage(
+ Integer partition,
+ String key)
+ {
Long counter = computeCount(key);
log.info("{} - current value for counter {}: {}", id, key, counter);
sendCounterState(partition, key, counter);
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions)
{
- assignedPartitions.addAll(partitions);
- phaser.bulkRegister(partitions.size());
- log.info(
- "{} - Added {} parties for newly assigned partitions. New total number of parties: {}",
- id,
- partitions.size(),
- phaser.getRegisteredParties());
+ partitions.forEach(partition ->
+ {
+ if (partition.topic().equals(topic))
+ {
+ log.info("{} - Adding partition {}", id, partition);
+ assignedPartitions.add(partition);
+
+ phaser.register();
+ log.info(
+ "{} - Registered new partie for restored assigned partition {}. New total number of parties: {}",
+ id,
+ partition,
+ phaser.getRegisteredParties());
+
+ log.info(
+ "{} - Changing partition-state for {}: {} -> RESTORING",
+ id,
+ partition,
+ partitionStates[partition.partition()]);
+ partitionStates[partition.partition()] = PartitionState.RESTORING;
+ }
+ else
+ {
+ long endOffset = consumer
+ .endOffsets(List.of(partition))
+ .get(partition)
+ .longValue();
+ log.info("{} - Found end-offset {} for state partition {}", id, endOffset, partition);
+ stateEndOffsets[partition.partition()] = endOffset;
+ }
+ });
+
+ assignedPartitions.forEach(messagePartition ->
+ {
+ int partition = messagePartition.partition();
+ TopicPartition statePartition = new TopicPartition(this.stateTopic, partition);
+
+ long stateBeginningOffset = consumer
+ .beginningOffsets(List.of(statePartition))
+ .get(statePartition);
+
+ if (stateBeginningOffset < stateEndOffsets[partition])
+ {
+ log.info(
+ "{} - Seeking to first offset {} for state partition {}",
+ id,
+ stateBeginningOffset,
+ statePartition);
+ consumer.seek(statePartition, stateBeginningOffset);
+
+ log.info("{} - Pausing message partition {}", id, messagePartition);
+ consumer.pause(List.of(messagePartition));
+
+ log.info("{} - Resuming state partition {}", id, statePartition);
+ consumer.resume(List.of(statePartition));
+ }
+ else
+ {
+ log.info(
+ "{} - State is up-to-date for message partition {}. New partition-state is ASSIGNED",
+ id,
+ messagePartition);
+ partitionStates[partition] = PartitionState.ASSIGNED;
+
+ log.info("{} - Pausing state partition {}...", id, statePartition);
+ consumer.pause(List.of(statePartition));
+
+ log.info("{} - Resuming message partition {}...", id, messagePartition);
+ consumer.resume(List.of(messagePartition));
+ }
+ });
}
@Override
- public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ public synchronized void onPartitionsRevoked(Collection<TopicPartition> partitions)
{
- assignedPartitions.removeAll(partitions);
- partitions.forEach(partition -> phaser.arriveAndDeregister());
- log.info(
- "{} - Removed {} parties for revoked partitions. New total number of parties: {}",
- id,
- partitions.size(),
- phaser.getRegisteredParties());
+ partitions.forEach(partition ->
+ {
+ if (partition.topic().equals(topic))
+ {
+ log.info("{} - Revoking partition {}", id, partition);
+ assignedPartitions.remove(partition);
+
+ PartitionState partitionState = partitionStates[partition.partition()];
+ switch (partitionState)
+ {
+ case RESTORING:
+ case ASSIGNED:
+ phaser.arriveAndDeregister();
+ log.info(
+ "{} - Deregistered partie for revoked partition {}. New total number of parties: {}",
+ id,
+ partition,
+ phaser.getRegisteredParties());
+ partitionStates[partition.partition()] = PartitionState.UNASSIGNED;
+ break;
+ default:
+ case UNASSIGNED:
+ log.warn("{} - partition {} in state {} was revoked!", id, partition, partitionState);
+ }
+ }
+ });
}
public void shutdown() throws InterruptedException
consumer.wakeup();
workerThread.join();
}
+
+ enum PartitionState
+ {
+ UNASSIGNED,
+ RESTORING,
+ ASSIGNED
+ }
}