import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.WakeupException;
lock.lock();
try
{
- ConsumerRecords<String, Long> records = consumer.poll(Duration.ofSeconds(1));
+ ConsumerRecords<String, Long> records =
+ consumer.poll(Duration.ofSeconds(1));
log.info("{} - Received {} messages", id, records.count());
+ // Startzeit merken
for (ConsumerRecord<String, Long> record : records)
{
- handleRecord(
- record.topic(),
- record.partition(),
- record.offset(),
- record.key(),
- record.value());
+ try
+ {
+ handleRecord(
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ record.key(),
+ record.value());
+ }
+ catch(RetriableErrorException e)
+ {
+ // Seeking to the offset of the record, that raised the exception, and
+ // leaving the loop afterward, retries the record
+ seekToRecord(record);
+ // TODO: Backoff
+ return;
+ }
+ catch(NonRetriableErrorException e)
+ {
+ skipRecord(record);
+ }
+ // Gesamtzeit kontrolliern
+ // Und ggf. abbrchen, wenn es zu lange dauert...
}
+
+ // Confluent Consumer Config: https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html
+ // auto.commit.interval.ms -> The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if enable.auto.commit is set to true. default 5 secs
+ // max.poll.interval.ms -> If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance - default 5min
+ // max.poll.records -> The maximum number of records returned in a single call to poll() - default 500
}
catch(RecordDeserializationException e)
{
}
}
+ /**
+ * Skips the given record. In other words: seeks the reading position of the
+ * {@link Consumer} to the offset after the given {@link ConsumerRecord}.
+ * The next call to {@link Consumer#poll(Duration)} will read the record that
+ * comes after the given record as next message for the partition, the record
+ * was read from.
+ * @param record The record, the consumer should read next
+ */
+ private void skipRecord(ConsumerRecord<String, Long> record)
+ {
+ skip(record.topic(), record.partition(), record.offset());
+ }
+
+ /**
+ * Skips the record at the given position. In other words: seeks the reading
+ * position of the {@link Consumer} to the offset after the given offset.
+ * The next call to {@link Consumer#poll(Duration)} will read the record that
+ * comes after the given offset as next message for the specified topic and
+ * partition.
+ * @param topic The topic, for that the offset position should be altered.
+ * @param partition The partition, for that the offset position should be altered.
+ * @param offsetToSkip The offset record that should be skipped for the given topic and partition.
+ */
+ private void skip(String topic, int partition, long offsetToSkip)
+ {
+ seek(topic, partition, offsetToSkip + 1);
+ }
+
+ /**
+ * Seeks the reading position of the {@link Consumer} to the given {@link ConsumerRecord}.
+ * The next call to {@link Consumer#poll(Duration)} will read the given record as next
+ * message for the partition, the record was read from.
+ * <strong>Note: </strong> the poll loop ({@link #pollAndHandleRecords()}) has
+ * to be exited after the call to this method to re-read the record in the
+ * next iteration of the endless polling loop.
+ * @param record The record, the consumer should read next
+ */
+ private void seekToRecord(ConsumerRecord<String, Long> record)
+ {
+ seek(record.topic(), record.partition(), record.offset());
+ }
+
+ /**
+ * Seeks the reading position of the {@link Consumer} to the given position.
+ * The next call to {@link Consumer#poll(Duration)} will read the record at
+ * the given offset of the specified partition.
+ * @param topic The topic, for that the offset position should be altered.
+ * @param partition The partition, for that the offset position should be altered.
+ * @param offsetOfNextMessage The offset of the record that should be read next from the given topic and partition.
+ */
+ private void seek(String topic, int partition, long offsetOfNextMessage)
+ {
+ consumer.seek(new TopicPartition(topic, partition), offsetOfNextMessage);
+ }
+
private void handleRecord(
String topic,
Integer partition,