+ assertThat(endlessConsumer.isRunning())
+ .describedAs("Consumer should still be running")
+ .isTrue();
+ }
+
+ @Test
+ void commitsOffsetOnProgramLogicErrorFoo()
+ {
+ clientMessageHandler.testHandler = (clientMessage, metadata) ->
+ {
+ if (Integer.parseInt(clientMessage.message)%10 ==0)
+ throw new RuntimeException("BOOM: " + clientMessage.message + "%10 == 0");
+ };
+
+ send100Messages((key, counter) -> serialize(key, counter));
+
+ await("80 records received")