+ send100Messages((partition, key, counter) ->
+ {
+ Bytes value = new Bytes(valueSerializer.serialize(TOPIC, counter));
+ return new ProducerRecord<>(TOPIC, partition, key, value);
+ });
+
+ await("100 records received")
+ .atMost(Duration.ofSeconds(30))
+ .pollInterval(Duration.ofSeconds(1))
+ .until(() -> receivedRecords.size() >= 100);
+
+ await("Offsets committed")
+ .atMost(Duration.ofSeconds(10))
+ .pollInterval(Duration.ofSeconds(1))
+ .untilAsserted(() ->
+ {
+ checkSeenOffsetsForProgress();
+ compareToCommitedOffsets(newOffsets);
+ });
+
+ assertThatExceptionOfType(IllegalStateException.class)
+ .isThrownBy(() -> endlessConsumer.exitStatus())
+ .describedAs("Consumer should still be running");
+ }
+
+ @Test
+ void commitsOffsetOfErrorForReprocessingOnDeserializationError()
+ {
+ send100Messages((partition, key, counter) ->
+ {
+ Bytes value = counter == 77
+ ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
+ : new Bytes(valueSerializer.serialize(TOPIC, counter));
+ return new ProducerRecord<>(TOPIC, partition, key, value);
+ });
+
+ await("Consumer failed")
+ .atMost(Duration.ofSeconds(30))
+ .pollInterval(Duration.ofSeconds(1))
+ .until(() -> !endlessConsumer.running());
+
+ checkSeenOffsetsForProgress();
+ compareToCommitedOffsets(newOffsets);
+
+ endlessConsumer.start();
+ await("Consumer failed")
+ .atMost(Duration.ofSeconds(30))
+ .pollInterval(Duration.ofSeconds(1))
+ .until(() -> !endlessConsumer.running());
+
+ checkSeenOffsetsForProgress();
+ compareToCommitedOffsets(newOffsets);
+ assertThat(receivedRecords.size())
+ .describedAs("Received not all sent events")
+ .isLessThan(100);
+
+ assertThatNoException()
+ .describedAs("Consumer should not be running")
+ .isThrownBy(() -> endlessConsumer.exitStatus());
+ assertThat(endlessConsumer.exitStatus())
+ .describedAs("Consumer should have exited abnormally")
+ .containsInstanceOf(RecordDeserializationException.class);
+ }