+
+
+ @Override
+ public void beforeNextPoll()
+ {
+ if (!commitsEnabled)
+ {
+ log.info("Offset commits are disabled! Last commit: {}", lastCommit);
+ return;
+ }
+
+ if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
+ {
+ log.debug("Storing data and offsets, last commit: {}", lastCommit);
+ partitions.forEach(partition -> stateRepository.save(
+ new StateDocument(
+ partition,
+ recordHandler.getState(partition).getState(),
+ adderResults.getState(partition),
+ consumer.position(new TopicPartition(topic, partition)))));
+ lastCommit = clock.instant();
+ }
+ }
+
+ @Override
+ public void enableCommits()
+ {
+ commitsEnabled = true;
+ }
+
+ @Override
+ public void disableCommits()
+ {
+ commitsEnabled = false;
+ }