- Map<TopicPartition, Long> oldOffsets = new HashMap<>();
- doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp, offset -1));
- Map<TopicPartition, Long> newOffsets = runEndlessConsumer((record) -> {});
-
- Set<TopicPartition> withProgress = new HashSet<>();
- partitions().forEach(tp ->
- {
- Long oldOffset = oldOffsets.get(tp);
- Long newOffset = newOffsets.get(tp);
- if (!oldOffset.equals(newOffset))
- {
- log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
- withProgress.add(tp);
- }
- });
- assertThat(withProgress).isNotEmpty().describedAs("Found no partitions with any offset-progress");