From: Kai Moritz Date: Sat, 14 Dec 2024 13:31:24 +0000 (+0100) Subject: GREEN: Erweitertes Error-Handling implementiert X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=fe2c23a834cdfb48b91c9c86974ecbfb16f337be;p=demos%2Fkafka%2Ftraining GREEN: Erweitertes Error-Handling implementiert --- diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 16a81a1f..38b848e1 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -2,25 +2,36 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; import org.springframework.util.backoff.BackOff; +import org.springframework.util.backoff.BackOffExecution; import java.time.Clock; import java.time.Duration; +import java.time.Instant; import java.util.Arrays; +import java.util.Collection; +import java.util.List; @Slf4j -public class ExampleConsumer implements Runnable +public class ExampleConsumer implements ConsumerRebalanceListener, Runnable { private final String id; private final String topic; private final Consumer consumer; private final RecordHandler recordHandler; private final Thread workerThread; + private final Clock clock; + private final Duration maxPollInterval; + private final Duration minTimeForNextRecord; + private final BackOff backOffStrategy; + private final BackOffState[] backOffState; private final Runnable closeCallback; private volatile boolean running = false; @@ -42,6 +53,14 @@ public class ExampleConsumer implements Runnable this.topic = topic; this.consumer = consumer; this.recordHandler = recordHandler; + this.clock = clock; + this.maxPollInterval = maxPollInterval; + this.minTimeForNextRecord = maxTimePerRecord.multipliedBy(2); + this.backOffStrategy = backOffStrategy; + + int numPartitions = consumer.partitionsFor(topic).size(); + log.info("{} - Topic {} has {} partitions", id, topic, numPartitions); + this.backOffState = new BackOffState[numPartitions]; workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); workerThread.start(); @@ -56,7 +75,7 @@ public class ExampleConsumer implements Runnable try { log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); + consumer.subscribe(Arrays.asList(topic), this); running = true; while (running) @@ -67,14 +86,100 @@ public class ExampleConsumer implements Runnable consumer.poll(Duration.ofSeconds(1)); log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) + + Instant deadline = clock.instant().plus(maxPollInterval); + boolean abortCurrentPoll = false; + + for (TopicPartition topicPartition : records.partitions()) { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); + if (backOffState[topicPartition.partition()].isWaitingForNextRetry()) + { + log.info("{} - {} is blocked, because it is waiting for a retry", id, topicPartition); + consumer.seek(topicPartition, backOffState[topicPartition.partition()].offset); + continue; + } + + List> recordsForPartition = records.records(topicPartition); + log.debug( + "{} - Received {} messages for partition {}", + id, + recordsForPartition.size(), + topicPartition); + + boolean partitionHasRetryableError = false; + + for (ConsumerRecord record : recordsForPartition) + { + if (abortCurrentPoll) + { + consumer.seek(topicPartition, record.offset()); + break; + } + + Instant now = clock.instant(); + Duration timeLeft = Duration.between(now, deadline); + + if (timeLeft.minus(minTimeForNextRecord).isNegative()) + { + log.info( + "{} - Aborting record handling, because only {} are left until the poll-interval expires!", + id, + timeLeft); + abortCurrentPoll = true; + consumer.seek(topicPartition, record.offset()); + break; + } + + try + { + handleRecord( + record.topic(), + record.partition(), + record.offset(), + record.key(), + record.value()); + } + catch (RetriableErrorException e) + { + // Seeking to the offset of the record, that raised the exception, and + // leaving the loop afterwards, retries the record + int partition = topicPartition.partition(); + if (!backOffState[partition].isRetryInProgress(record.offset())) + { + backOffState[partition] = new BackOffState(topicPartition, record.offset()); + partitionHasRetryableError = true; + consumer.seek(topicPartition, record.offset()); + break; + } + else + { + if (backOffState[partition].isUnsuccessful()) + { + log.warn( + "{} - Ignoring retryable error after {} attempts and {}", + id, + backOffState[partition].getNumRetries(), + backOffState[partition].getDurationSpendRetrying()); + } + else + { + consumer.seek(topicPartition, record.offset()); + partitionHasRetryableError = true; + break; + } + } + } + catch (NonRetriableErrorException e) + { + // Just ignore, to skip + log.warn("{} - Ignoring non-retryable error!", id, e); + } + + if (!partitionHasRetryableError) + { + backOffState[topicPartition.partition()].markRetryAsSuccessful(); + } + } } } catch(RecordDeserializationException e) @@ -116,6 +221,18 @@ public class ExampleConsumer implements Runnable } + @Override + public void onPartitionsAssigned(Collection partitions) + { + partitions.forEach(tp -> backOffState[tp.partition()] = new BackOffState()); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + } + + public void shutdown() throws InterruptedException { log.info("{} joining the worker-thread...", id); @@ -123,4 +240,106 @@ public class ExampleConsumer implements Runnable consumer.wakeup(); workerThread.join(); } + + + private class BackOffState + { + private final TopicPartition topicPartition; + private final long offset; + private final Instant startTime; + + private BackOffExecution backOffExecution; + private int numRetries = 0; + private Instant timeNextRetryIsDue; + + + BackOffState() + { + topicPartition = null; + offset = 0; + startTime = null; + } + + BackOffState(TopicPartition topicPartition, long offset) + { + this.topicPartition = topicPartition; + this.offset = offset; + this.startTime = clock.instant(); + + log.debug("{} - Back-Off requested for {} at {}", id, topicPartition, startTime); + backOffExecution = backOffStrategy.start(); + initializeNextBackOff(); + } + + boolean isWaitingForNextRetry() + { + if (backOffExecution == null) + { + return false; + } + + if (clock.instant().isAfter(timeNextRetryIsDue)) + { + numRetries++; + initializeNextBackOff(); + log.info("{} - Retrying {}", id, topicPartition); + return false; + } + else + { + log.info("{} - Next retry for {} is due at {}", id, topicPartition, timeNextRetryIsDue); + return true; + } + } + + boolean isRetryInProgress(long offset) + { + return this.offset == offset && timeNextRetryIsDue != null; + } + + boolean isUnsuccessful() + { + return backOffExecution == null && timeNextRetryIsDue != null; + } + + int getNumRetries() + { + return numRetries; + } + + Duration getDurationSpendRetrying() + { + return Duration.between(startTime, timeNextRetryIsDue); + } + + void markRetryAsSuccessful() + { + if (backOffExecution != null) + { + log.info( + "{} - retry #{} for {} succeeded after {}", + id, + numRetries, + topicPartition, + Duration.between(startTime, clock.instant())); + + backOffExecution = null; + timeNextRetryIsDue = null; + } + } + + private void initializeNextBackOff() + { + long backOffMillis = backOffExecution.nextBackOff(); + + if (backOffMillis == BackOffExecution.STOP) + { + backOffExecution = null; + } + else + { + timeNextRetryIsDue = clock.instant().plusMillis(backOffMillis); + } + } + } }