import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
-import java.util.Map;
@RequiredArgsConstructor
Integer partition = tp.partition();
Long offset = consumer.position(tp);
log.info("{} - adding partition: {}, offset={}", id, partition, offset);
- StatisticsDocument document =
+ StateDocument document =
repository
.findById(Integer.toString(partition))
- .orElse(new StatisticsDocument(partition));
+ .orElse(new StateDocument(partition));
if (document.offset >= 0)
{
// Only seek, if a stored offset was found
// Otherwise: Use initial offset, generated by Kafka
consumer.seek(tp, document.offset);
}
- handler.addPartition(partition, document.statistics);
+ handler.addPartition(partition, document.state);
});
}
id,
partition,
newOffset);
- Map<String, Map<String, Long>> removed = handler.removePartition(partition);
- repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));
+ repository.save(new StateDocument(partition, handler.removePartition(partition), newOffset));
});
}
if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
{
log.debug("Storing data and offsets, last commit: {}", lastCommit);
- handler.getSeen().forEach((partiton, statistics) -> repository.save(
- new StatisticsDocument(
+ handler.getState().forEach((partiton, sumBusinessLogic) -> repository.save(
+ new StateDocument(
partiton,
- statistics,
+ sumBusinessLogic.getState(),
consumer.position(new TopicPartition(topic, partiton)))));
lastCommit = clock.instant();
}