// Otherwise: Use initial offset, generated by Kafka
consumer.seek(tp, document.offset);
}
- handler.addPartition(partition, document);
+ handler.addPartition(partition, document.state);
});
}
id,
partition,
newOffset);
- repository.save(handler.removePartition(partition));
+ repository.save(new StateDocument(partition, handler.removePartition(partition), newOffset));
});
}
if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
{
log.debug("Storing data and offsets, last commit: {}", lastCommit);
- handler.getSeen().forEach((partiton, statistics) -> repository.save(
+ handler.getState().forEach((partiton, sumBusinessLogic) -> repository.save(
new StateDocument(
partiton,
- statistics,
+ sumBusinessLogic.getState(),
consumer.position(new TopicPartition(topic, partiton)))));
lastCommit = clock.instant();
}