+ send100Messages((partition, key, counter) ->
+ {
+ Bytes value = new Bytes(valueSerializer.serialize(TOPIC, counter));
+ return new ProducerRecord<>(TOPIC, partition, key, value);
+ });
+
+ await("100 records received")
+ .atMost(Duration.ofSeconds(30))
+ .pollInterval(Duration.ofSeconds(1))
+ .until(() -> receivedRecords.size() >= 100);
+
+ await("Offsets committed")
+ .atMost(Duration.ofSeconds(10))
+ .pollInterval(Duration.ofSeconds(1))
+ .untilAsserted(() ->
+ {
+ checkSeenOffsetsForProgress();
+ compareToCommitedOffsets(newOffsets);
+ });
+
+ assertThatExceptionOfType(IllegalStateException.class)
+ .isThrownBy(() -> endlessConsumer.exitStatus())
+ .describedAs("Consumer should still be running");
+ }
+
+
+ /** Helper methods for the verification of expectations */