import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.WakeupException;
import javax.annotation.PreDestroy;
consumer.commitSync();
shutdown();
}
+ catch(RecordDeserializationException e)
+ {
+ TopicPartition tp = e.topicPartition();
+ long offset = e.offset();
+ log.error(
+ "{} - Could not deserialize message on topic {} with offset={}: {}",
+ id,
+ tp,
+ offset,
+ e.getCause().toString());
+
+ consumer.commitSync();
+ shutdown(e);
+ }
catch(Exception e)
{
log.error("{} - Unexpected error: {}", id, e.toString(), e);
@Test
@Order(2)
- void commitsNoOffsetsOnError()
+ void commitsOffsetOfErrorForReprocessingOnError()
{
send100Messages(counter ->
counter == 77
.until(() -> !endlessConsumer.running());
checkSeenOffsetsForProgress();
- compareToCommitedOffsets(oldOffsets);
+ compareToCommitedOffsets(newOffsets);
}