+ await("80 records received")
+ .atMost(Duration.ofSeconds(30))
+ .until(() -> receivedRecords.size() == 100);
+
+ await("Offsets committed")
+ .atMost(Duration.ofSeconds(10))
+ .pollDelay(Duration.ofSeconds(1))
+ .untilAsserted(() ->
+ {
+ checkSeenOffsetsForProgress();
+ compareToCommitedOffsets(newOffsets);
+ });
+
+ assertThat(endlessConsumer.isRunning())
+ .describedAs("Consumer should still be running")
+ .isTrue();
+ }
+
+
+ /** Helper methods for the verification of expectations */
+
+ void compareToCommitedOffsets(Map<TopicPartition, Long> offsetsToCheck)
+ {
+ doForCurrentOffsets((tp, offset) ->
+ {
+ Long expected = offsetsToCheck.get(tp) + 1;
+ log.debug("TEST: Comparing the expected offset of {} for {} to {}", expected, tp, offset);
+ assertThat(offset)
+ .describedAs("Committed offset corresponds to the offset of the consumer")
+ .isEqualTo(expected);
+ });
+ }
+
+ void checkSeenOffsetsForProgress()
+ {
+ // Be sure, that some messages were consumed...!