@RequiredArgsConstructor
@Slf4j
-public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
+public class ApplicationRebalanceListener implements RebalanceListener
{
private final ApplicationRecordHandler recordHandler;
private final AdderResults adderResults;
private final String topic;
private final Clock clock;
private final Duration commitInterval;
- private final Consumer<String, String> consumer;
+ private final Consumer consumer;
private final Set<Integer> partitions = new HashSet<>();
}
else
{
- log.info("Offset commits are disabled! Last commit: {}", lastCommit);
+ log.info("{} - Offset commits are disabled! Last commit: {}", id, lastCommit);
}
});
}
{
if (!commitsEnabled)
{
- log.info("Offset commits are disabled! Last commit: {}", lastCommit);
+ log.info("{} - Offset commits are disabled! Last commit: {}", id, lastCommit);
return;
}
if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
{
- log.debug("Storing data and offsets, last commit: {}", lastCommit);
+ log.debug("{} - Storing data and offsets, last commit: {}", id, lastCommit);
partitions.forEach(partition -> stateRepository.save(
new StateDocument(
partition,