From 720f060c12501ac3c5fe3a8c89a1efd046e24947 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 14 Dec 2024 14:31:24 +0100 Subject: [PATCH] TODO:error-handling --- .../java/de/juplo/kafka/ExampleConsumer.java | 94 +++++++++++++++++-- 1 file changed, 87 insertions(+), 7 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 17a02642..236cc3df 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -5,6 +5,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; @@ -81,18 +82,42 @@ public class ExampleConsumer implements Runnable lock.lock(); try { - ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + ConsumerRecords records = + consumer.poll(Duration.ofSeconds(1)); log.info("{} - Received {} messages", id, records.count()); + // Startzeit merken for (ConsumerRecord record : records) { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); + try + { + handleRecord( + record.topic(), + record.partition(), + record.offset(), + record.key(), + record.value()); + } + catch(RetriableErrorException e) + { + // Seeking to the offset of the record, that raised the exception, and + // leaving the loop afterward, retries the record + seekToRecord(record); + // TODO: Backoff + return; + } + catch(NonRetriableErrorException e) + { + skipRecord(record); + } + // Gesamtzeit kontrolliern + // Und ggf. abbrchen, wenn es zu lange dauert... } + + // Confluent Consumer Config: https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html + // auto.commit.interval.ms -> The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if enable.auto.commit is set to true. default 5 secs + // max.poll.interval.ms -> If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance - default 5min + // max.poll.records -> The maximum number of records returned in a single call to poll() - default 500 } catch(RecordDeserializationException e) { @@ -109,6 +134,61 @@ public class ExampleConsumer implements Runnable } } + /** + * Skips the given record. In other words: seeks the reading position of the + * {@link Consumer} to the offset after the given {@link ConsumerRecord}. + * The next call to {@link Consumer#poll(Duration)} will read the record that + * comes after the given record as next message for the partition, the record + * was read from. + * @param record The record, the consumer should read next + */ + private void skipRecord(ConsumerRecord record) + { + skip(record.topic(), record.partition(), record.offset()); + } + + /** + * Skips the record at the given position. In other words: seeks the reading + * position of the {@link Consumer} to the offset after the given offset. + * The next call to {@link Consumer#poll(Duration)} will read the record that + * comes after the given offset as next message for the specified topic and + * partition. + * @param topic The topic, for that the offset position should be altered. + * @param partition The partition, for that the offset position should be altered. + * @param offsetToSkip The offset record that should be skipped for the given topic and partition. + */ + private void skip(String topic, int partition, long offsetToSkip) + { + seek(topic, partition, offsetToSkip + 1); + } + + /** + * Seeks the reading position of the {@link Consumer} to the given {@link ConsumerRecord}. + * The next call to {@link Consumer#poll(Duration)} will read the given record as next + * message for the partition, the record was read from. + * Note: the poll loop ({@link #pollAndHandleRecords()}) has + * to be exited after the call to this method to re-read the record in the + * next iteration of the endless polling loop. + * @param record The record, the consumer should read next + */ + private void seekToRecord(ConsumerRecord record) + { + seek(record.topic(), record.partition(), record.offset()); + } + + /** + * Seeks the reading position of the {@link Consumer} to the given position. + * The next call to {@link Consumer#poll(Duration)} will read the record at + * the given offset of the specified partition. + * @param topic The topic, for that the offset position should be altered. + * @param partition The partition, for that the offset position should be altered. + * @param offsetOfNextMessage The offset of the record that should be read next from the given topic and partition. + */ + private void seek(String topic, int partition, long offsetOfNextMessage) + { + consumer.seek(new TopicPartition(topic, partition), offsetOfNextMessage); + } + private void handleRecord( String topic, Integer partition, -- 2.20.1