-
-
- @Override
- public void beforeNextPoll()
- {
- if (!commitsEnabled)
- {
- log.info("Offset commits are disabled! Last commit: {}", lastCommit);
- return;
- }
-
- if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
- {
- log.debug("Storing data and offsets, last commit: {}", lastCommit);
- partitions.forEach(partition -> stateRepository.save(
- new StateDocument(
- partition,
- recordHandler.getState(partition).getState(),
- adderResults.getState(partition),
- consumer.position(new TopicPartition(topic, partition)))));
- lastCommit = clock.instant();
- }
- }
-
- @Override
- public void enableCommits()
- {
- commitsEnabled = true;
- }
-
- @Override
- public void disableCommits()
- {
- commitsEnabled = false;
- }