private final String topic;
private final Clock clock;
private final Duration commitInterval;
- private final Consumer<String, String> consumer;
+ private final Consumer<String, Long> consumer;
private Instant lastCommit = Instant.EPOCH;
private boolean commitsEnabled = true;
if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
{
log.debug("Storing data and offsets, last commit: {}", lastCommit);
- recordHandler.getState().forEach((partiton, adder) -> stateRepository.save(
+ recordHandler.getState().forEach((partiton, state) -> stateRepository.save(
new StateDocument(
partiton,
- adder.getState(),
+ state,
consumer.position(new TopicPartition(topic, partiton)))));
lastCommit = clock.instant();
}