+ @Override
+ public void deactivatePartition(int partition, long offset)
+ {
+ data.offsets[partition] = offset;
+ }
+
+ @Override
+ public long storedPosition(int partition)
+ {
+ return data.offsets[partition];
+ }
+
+ @Override
+ public void storeState(Map<Integer, Long> offsets)
+ {
+ offsets.forEach((partition, offset) -> data.offsets[partition] = offset);
+ stateStore.ifPresent(file ->
+ {
+ try (
+ FileOutputStream fos = new FileOutputStream(file);
+ ObjectOutputStream oos = new ObjectOutputStream(fos))
+ {
+ oos.writeObject(data);
+ IntStream
+ .range(0, numPartitions)
+ .forEach(i -> log.info(
+ "locally stored state for partition {}: position={}, entries={}",
+ i,
+ data.offsets[i],
+ data.offsets[i] == 0 ? 0 : data.mappings[i].size()));
+ }
+ catch (IOException e)
+ {
+ log.error("could not write state to store {}: {}", file, e.getMessage());
+ }
+ });
+ }
+
+