+ data.mappings[partitionForId(id)].remove(id);
+ }
+
+ @Override
+ public long activatePartition(int partition)
+ {
+ if (data.offsets[partition] == 0)
+ {
+ // Initialize the state of the partition, if
+ // no corresponding offset is known.
+ if (data.mappings[partition] != null)
+ log.warn(
+ "dropping state for partition {} ({} entries), because the corresponding offset is unknown!",
+ partition,
+ data.mappings[partition].size());
+
+ data.mappings[partition] = new HashMap<>();
+ }
+
+ return data.offsets[partition];
+ }
+
+ @Override
+ public void deactivatePartition(int partition, long offset)
+ {
+ data.offsets[partition] = offset;
+ }
+
+ @Override
+ public long storedPosition(int partition)
+ {
+ return data.offsets[partition];
+ }
+
+ @Override
+ public void storeState(Map<Integer, Long> offsets)
+ {
+ offsets.forEach((partition, offset) -> data.offsets[partition] = offset);
+ for (int i = 0; i < numPartitions; i++)
+ {
+ if (data.offsets[i] == 0 && data.mappings[i] != null)
+ {
+ log.warn(
+ "dropping state for partition {} ({} entries), because the corresponding offset is unknown!",
+ i,
+ data.mappings[i].size());
+
+ data.mappings[i] = null;
+ }
+ }
+ stateStore.ifPresent(file ->
+ {
+ try (
+ FileOutputStream fos = new FileOutputStream(file);
+ ObjectOutputStream oos = new ObjectOutputStream(fos))
+ {
+ oos.writeObject(data);
+ IntStream
+ .range(0, numPartitions)
+ .forEach(i -> log.info(
+ "locally stored state for partition {}: position={}, entries={}",
+ i,
+ data.offsets[i],
+ data.offsets[i] == 0 ? 0 : data.mappings[i].size()));
+ }
+ catch (IOException e)
+ {
+ log.error("could not write state to store {}: {}", file, e.getMessage());
+ }
+ });
+ }
+
+
+ private int partitionForId(long id)
+ {
+ String key = Long.toString(id);
+ return TransferPartitioner.computeHashForKey(key, numPartitions);
+ }
+
+
+ static class Data implements Serializable
+ {
+ final long offsets[];
+ final Map<Long, String> mappings[];
+
+ Data(int numPartitions)
+ {
+ offsets = new long[numPartitions];
+ mappings = new Map[numPartitions];
+ for (int i = 0; i < numPartitions; i++)
+ offsets[i] = 0;
+ }