- log.info("--> starting restore...");
-
- Map<Integer, Long> lastSeen =
- consumer
- .endOffsets(partitions)
- .entrySet()
- .stream()
- .collect(Collectors.toMap(
- entry -> entry.getKey().partition(),
- entry -> entry.getValue() - 1));
-
- Map<Integer, Long> positions =
- lastSeen
- .keySet()
- .stream()
- .collect(Collectors.toMap(
- partition -> partition,
- partition -> repository.storedPosition(partition)));
-
- while (
- positions
- .entrySet()
- .stream()
- .map(entry -> entry.getValue() < lastSeen.get(entry.getKey()))
- .reduce(false, (a, b) -> a || b))