+ send100Messages(i -> new Bytes(valueSerializer.serialize(TOPIC, i)));
+
+ await("100 records received")
+ .atMost(Duration.ofSeconds(30))
+ .until(() -> receivedRecords.size() >= 100);
+
+ await("Offsets committed")
+ .atMost(Duration.ofSeconds(10))
+ .untilAsserted(() ->
+ {
+ checkSeenOffsetsForProgress();
+ compareToCommitedOffsets(newOffsets);
+ });
+
+ assertThatExceptionOfType(IllegalStateException.class)
+ .isThrownBy(() -> endlessConsumer.exitStatus())
+ .describedAs("Consumer should still be running");
+ }
+
+
+ /** Helper methods for the verification of expectations */