-
-
- @Override
- public void beforeNextPoll()
- {
- if (!commitsEnabled)
- {
- log.info("Offset commits are disabled! Last commit: {}", lastCommit);
- return;
- }
-
- if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
- {
- log.debug("Storing data and offsets, last commit: {}", lastCommit);
- recordHandler.getState().forEach((partiton, adder) -> stateRepository.save(
- new StateDocument(
- partiton,
- adder.getState(),
- consumer.position(new TopicPartition(topic, partiton)))));
- lastCommit = clock.instant();
- }
- }
-
- @Override
- public void enableCommits()
- {
- commitsEnabled = true;
- }
-
- @Override
- public void disableCommits()
- {
- commitsEnabled = false;
- }