+ send100Messages((key, counter) -> serializeAsClientMessage(key, counter));
+
+ await("100 records received")
+ .atMost(Duration.ofSeconds(30))
+ .until(() -> received.size() == 100);
+
+ await("Offsets committed")
+ .atMost(Duration.ofSeconds(10))
+ .untilAsserted(() ->
+ {
+ checkSeenOffsetsForProgress();
+ compareToCommitedOffsets(newOffsets);
+ });
+
+ assertThat(endlessConsumer.isRunning())
+ .describedAs("Consumer should still be running")
+ .isTrue();
+ }
+
+
+ @Test
+ void mixedMessages()
+ {
+ send100Messages((key, counter) ->
+ counter%3 == 0
+ ? serializeAsGreeting(key)
+ : serializeAsClientMessage(key, counter));