TODO:error-handling consumer/spring-consumer--error-handling--start+stop
authorKai Moritz <kai@juplo.de>
Sat, 14 Dec 2024 13:31:24 +0000 (14:31 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 15 Dec 2024 11:19:20 +0000 (12:19 +0100)
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 17a0264..236cc3d 100644 (file)
@@ -5,6 +5,7 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.RecordDeserializationException;
 import org.apache.kafka.common.errors.WakeupException;
 
@@ -81,18 +82,42 @@ public class ExampleConsumer implements Runnable
     lock.lock();
     try
     {
-      ConsumerRecords<String, Long> records = consumer.poll(Duration.ofSeconds(1));
+      ConsumerRecords<String, Long> records =
+        consumer.poll(Duration.ofSeconds(1));
 
       log.info("{} - Received {} messages", id, records.count());
+      // Startzeit merken
       for (ConsumerRecord<String, Long> record : records)
       {
-        handleRecord(
-          record.topic(),
-          record.partition(),
-          record.offset(),
-          record.key(),
-          record.value());
+        try
+        {
+          handleRecord(
+            record.topic(),
+            record.partition(),
+            record.offset(),
+            record.key(),
+            record.value());
+        }
+        catch(RetriableErrorException e)
+        {
+          // Seeking to the offset of the record, that raised the exception, and
+          // leaving the loop afterward, retries the record
+          seekToRecord(record);
+          // TODO: Backoff
+          return;
+        }
+        catch(NonRetriableErrorException e)
+        {
+          skipRecord(record);
+        }
+        // Gesamtzeit kontrolliern
+        // Und ggf. abbrchen, wenn es zu lange dauert...
       }
+
+      // Confluent Consumer Config: https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html
+      // auto.commit.interval.ms -> The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if enable.auto.commit is set to true. default 5 secs
+      // max.poll.interval.ms ->  If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance - default 5min
+      // max.poll.records -> The maximum number of records returned in a single call to poll() - default 500
     }
     catch(RecordDeserializationException e)
     {
@@ -109,6 +134,61 @@ public class ExampleConsumer implements Runnable
     }
   }
 
+  /**
+   * Skips the given record. In other words: seeks the reading position of the
+   * {@link Consumer} to the offset after the given {@link ConsumerRecord}.
+   * The next call to {@link Consumer#poll(Duration)} will read the record that
+   * comes after the given record as next message for the partition, the record
+   * was read from.
+   * @param record The record, the consumer should read next
+   */
+  private void skipRecord(ConsumerRecord<String, Long> record)
+  {
+    skip(record.topic(), record.partition(), record.offset());
+  }
+
+  /**
+   * Skips the record at the given position. In other words: seeks the reading
+   * position of the {@link Consumer} to the offset after the given offset.
+   * The next call to {@link Consumer#poll(Duration)} will read the record that
+   * comes after the given offset as next message for the specified topic and
+   * partition.
+   * @param topic The topic, for that the offset position should be altered.
+   * @param partition The partition, for that the offset position should be altered.
+   * @param offsetToSkip The offset record that should be skipped for the given topic and partition.
+   */
+  private void skip(String topic, int partition, long offsetToSkip)
+  {
+    seek(topic, partition, offsetToSkip + 1);
+  }
+
+  /**
+   * Seeks the reading position of the {@link Consumer} to the given {@link ConsumerRecord}.
+   * The next call to {@link Consumer#poll(Duration)} will read the given record as next
+   * message for the partition, the record was read from.
+   * <strong>Note: </strong> the poll loop ({@link #pollAndHandleRecords()}) has
+   * to be exited after the call to this method to re-read the record in the
+   * next iteration of the endless polling loop.
+   * @param record The record, the consumer should read next
+   */
+  private void seekToRecord(ConsumerRecord<String, Long> record)
+  {
+    seek(record.topic(), record.partition(), record.offset());
+  }
+
+  /**
+   * Seeks the reading position of the {@link Consumer} to the given position.
+   * The next call to {@link Consumer#poll(Duration)} will read the record at
+   * the given offset of the specified partition.
+   * @param topic The topic, for that the offset position should be altered.
+   * @param partition The partition, for that the offset position should be altered.
+   * @param offsetOfNextMessage The offset of the record that should be read next from the given topic and partition.
+   */
+  private void seek(String topic, int partition, long offsetOfNextMessage)
+  {
+    consumer.seek(new TopicPartition(topic, partition), offsetOfNextMessage);
+  }
+
   private void handleRecord(
     String topic,
     Integer partition,