{
send100Messages(i -> new Bytes(longSerializer.serialize(TOPIC, i)));
+ Map<TopicPartition, Long> oldOffsets = new HashMap<>();
+ doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp, offset -1));
Set<ConsumerRecord<String, Long>> received = new HashSet<>();
- Map<TopicPartition, Long> offsets = runEndlessConsumer(record ->
+ Map<TopicPartition, Long> newOffsets = runEndlessConsumer(record ->
{
received.add(record);
if (received.size() == 100)
throw new WakeupException();
});
- check(offsets);
+ Set<TopicPartition> withProgress = new HashSet<>();
+ partitions().forEach(tp ->
+ {
+ Long oldOffset = oldOffsets.get(tp);
+ Long newOffset = newOffsets.get(tp);
+ if (!oldOffset.equals(newOffset))
+ {
+ log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
+ withProgress.add(tp);
+ }
+ });
+ assertThat(withProgress).isNotEmpty().describedAs("Found no partitions with any offset-progress");
+
+ check(newOffsets);
}
@Test
doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp, offset -1));
Map<TopicPartition, Long> newOffsets = runEndlessConsumer((record) -> {});
+ Set<TopicPartition> withProgress = new HashSet<>();
+ partitions().forEach(tp ->
+ {
+ Long oldOffset = oldOffsets.get(tp);
+ Long newOffset = newOffsets.get(tp);
+ if (!oldOffset.equals(newOffset))
+ {
+ log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
+ withProgress.add(tp);
+ }
+ });
+ assertThat(withProgress).isNotEmpty().describedAs("Found no partitions with any offset-progress");
+
check(oldOffsets);
}