{
int partition = partitionForId(transfer.getId());
data.mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer));
+ // We reset the offset for the state of the modified partition,
+ // because the corresponding offset is not known (yet).
+ data.offsets[partition] = 0;
}
catch (JsonProcessingException e)
{
@Override
public long activatePartition(int partition)
{
+ if (data.offsets[partition] == 0)
+ {
+ // Initialize the state of the partition, if
+ // no corresponding offset is known.
+ if (data.mappings[partition] != null)
+ log.warn(
+ "dropping state for partition {} ({} entries), because the corresponding offset is unknown!",
+ partition,
+ data.mappings[partition].size());
+
+ data.mappings[partition] = new HashMap<>();
+ }
+
return data.offsets[partition];
}
public void storeState(Map<Integer, Long> offsets)
{
offsets.forEach((partition, offset) -> data.offsets[partition] = offset);
+ for (int i = 0; i < numPartitions; i++)
+ {
+ if (data.offsets[i] == 0 && data.mappings[i] != null)
+ {
+ log.warn(
+ "dropping state for partition {} ({} entries), because the corresponding offset is unknown!",
+ i,
+ data.mappings[i].size());
+
+ data.mappings[i] = null;
+ }
+ }
stateStore.ifPresent(file ->
{
try (
offsets = new long[numPartitions];
mappings = new Map[numPartitions];
for (int i = 0; i < numPartitions; i++)
- {
offsets[i] = 0;
- mappings[i] = new HashMap<>();
- }
}
}
}