+ await("Offsets committed")
+ .atMost(Duration.ofSeconds(10))
+ .untilAsserted(() ->
+ {
+ // UNSCHÖN:
+ // Funktioniert nur, weil nach der Nachrichten, die den
+ // Deserialisierungs-Fehler auslöst noch valide Nachrichten
+ // gelesen werden.
+ // GRUND:
+ // Der MessageHandler sieht den Offset der Fehlerhaften
+ // Nachricht nicht!
+ checkSeenOffsetsForProgress();
+ compareToCommitedOffsets(newOffsets);
+ });
+
+ assertThat(endlessConsumer.isRunning())
+ .describedAs("Consumer should still be running")
+ .isTrue();
+ }
+
+ @Test
+ void commitsOffsetOnProgramLogicErrorFoo()
+ {
+ recordHandler.testHandler = (record) ->
+ {
+ if (Integer.parseInt(record.value().message)%10 ==0)
+ throw new RuntimeException("BOOM: " + record.value().message + "%10 == 0");
+ };
+
+ send100Messages((key, counter) -> serialize(key, counter));
+
+ await("80 records received")
+ .atMost(Duration.ofSeconds(30))
+ .until(() -> receivedRecords.size() == 100);
+
+ await("Offsets committed")
+ .atMost(Duration.ofSeconds(10))
+ .pollDelay(Duration.ofSeconds(1))
+ .untilAsserted(() ->
+ {
+ checkSeenOffsetsForProgress();
+ compareToCommitedOffsets(newOffsets);
+ });
+
+ assertThat(endlessConsumer.isRunning())
+ .describedAs("Consumer should still be running")
+ .isTrue();