+ @Test
+ void commitsOffsetOnProgramLogicErrorFoo()
+ {
+ recordHandler.testHandler = (record) ->
+ {
+ if (Integer.parseInt(record.value().message)%10 ==0)
+ throw new RuntimeException("BOOM: " + record.value().message + "%10 == 0");
+ };
+
+ send100Messages((key, counter) -> serialize(key, counter));
+
+ await("80 records received")
+ .atMost(Duration.ofSeconds(30))
+ .until(() -> receivedRecords.size() == 100);
+
+ await("Offsets committed")
+ .atMost(Duration.ofSeconds(10))
+ .pollDelay(Duration.ofSeconds(1))
+ .untilAsserted(() ->
+ {
+ checkSeenOffsetsForProgress();
+ compareToCommitedOffsets(newOffsets);
+ });
+
+ assertThat(endlessConsumer.isRunning())
+ .describedAs("Consumer should still be running")
+ .isTrue();
+ }
+
+
+ /** Helper methods for the verification of expectations */
+
+ void compareToCommitedOffsets(Map<TopicPartition, Long> offsetsToCheck)
+ {
+ doForCurrentOffsets((tp, offset) ->
+ {
+ Long expected = offsetsToCheck.get(tp) + 1;
+ log.debug("TEST: Comparing the expected offset of {} for {} to {}", expected, tp, offset);
+ assertThat(offset)
+ .describedAs("Committed offset corresponds to the offset of the consumer")
+ .isEqualTo(expected);
+ });
+ }