counterState.setCounterState(key, Long.parseLong(value));
if (offset + 1 == stateEndOffsets[partition])
{
- log.info(
- "{} - Restoring of state for partition {} done! New partition-state is ASSIGNED!", id, partition);
- partitionStates[partition] = PartitionState.ASSIGNED;
-
- TopicPartition tp;
-
- tp = new TopicPartition(stateTopic, partition);
- log.info("{} - Pausing state partition {}...", id, tp);
- consumer.pause(List.of(tp));
-
- tp = new TopicPartition(topic, partition);
- log.info("{} - Resuming message partition {}...", id, tp);
- consumer.resume(List.of(tp));
+ log.info("{} - Restoring of state for partition {} done!", id, partition);
+ stateAssigned(partition);
}
else
{
}
else
{
- log.info(
- "{} - State is up-to-date for message partition {}. New partition-state is ASSIGNED",
- id,
- messagePartition);
- partitionStates[partition] = PartitionState.ASSIGNED;
-
- log.info("{} - Pausing state partition {}...", id, statePartition);
- consumer.pause(List.of(statePartition));
-
- log.info("{} - Resuming message partition {}...", id, messagePartition);
- consumer.resume(List.of(messagePartition));
+ log.info("{} - State is up-to-date for message partition {}", id, messagePartition);
+ stateAssigned(partition);
}
});
}
});
}
+ private void stateAssigned(int partition)
+ {
+ log.info(
+ "{} - State-change for partition {}: {} -> ASSIGNED",
+ id,
+ partition,
+ partitionStates[partition]);
+
+ partitionStates[partition] = PartitionState.ASSIGNED;
+
+ TopicPartition statePartition = new TopicPartition(stateTopic, partition);
+ log.info("{} - Pausing state partition {}...", id, statePartition);
+ consumer.pause(List.of(statePartition));
+
+ TopicPartition messagePartition = new TopicPartition(topic, partition);
+ log.info("{} - Resuming message partition {}...", id, messagePartition);
+ consumer.resume(List.of(messagePartition));
+ }
+
public void shutdown() throws InterruptedException
{
log.info("{} joining the worker-thread...", id);