import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.WakeupException;
import org.springframework.util.backoff.BackOff;
+import org.springframework.util.backoff.BackOffExecution;
import java.time.Clock;
import java.time.Duration;
+import java.time.Instant;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
@Slf4j
-public class ExampleConsumer implements Runnable
+public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
{
private final String id;
private final String topic;
private final Consumer<String, Long> consumer;
private final RecordHandler<String, Long> recordHandler;
private final Thread workerThread;
+ private final Clock clock;
+ private final Duration maxPollInterval;
+ private final Duration minTimeForNextRecord;
+ private final BackOff backOffStrategy;
+ private final BackOffState[] backOffState;
private final Runnable closeCallback;
private volatile boolean running = false;
this.topic = topic;
this.consumer = consumer;
this.recordHandler = recordHandler;
+ this.clock = clock;
+ this.maxPollInterval = maxPollInterval;
+ this.minTimeForNextRecord = maxTimePerRecord.multipliedBy(2);
+ this.backOffStrategy = backOffStrategy;
+
+ int numPartitions = consumer.partitionsFor(topic).size();
+ log.info("{} - Topic {} has {} partitions", id, topic, numPartitions);
+ this.backOffState = new BackOffState[numPartitions];
workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
workerThread.start();
try
{
log.info("{} - Subscribing to topic {}", id, topic);
- consumer.subscribe(Arrays.asList(topic));
+ consumer.subscribe(Arrays.asList(topic), this);
running = true;
while (running)
consumer.poll(Duration.ofSeconds(1));
log.info("{} - Received {} messages", id, records.count());
- for (ConsumerRecord<String, Long> record : records)
+
+ Instant deadline = clock.instant().plus(maxPollInterval);
+ boolean abortCurrentPoll = false;
+
+ for (TopicPartition topicPartition : records.partitions())
{
- handleRecord(
- record.topic(),
- record.partition(),
- record.offset(),
- record.key(),
- record.value());
+ if (backOffState[topicPartition.partition()].isWaitingForNextRetry())
+ {
+ log.info("{} - {} is blocked, because it is waiting for a retry", id, topicPartition);
+ consumer.seek(topicPartition, backOffState[topicPartition.partition()].offset);
+ continue;
+ }
+
+ List<ConsumerRecord<String, Long>> recordsForPartition = records.records(topicPartition);
+ log.debug(
+ "{} - Received {} messages for partition {}",
+ id,
+ recordsForPartition.size(),
+ topicPartition);
+
+ boolean partitionHasRetryableError = false;
+
+ for (ConsumerRecord<String, Long> record : recordsForPartition)
+ {
+ if (abortCurrentPoll)
+ {
+ consumer.seek(topicPartition, record.offset());
+ break;
+ }
+
+ Instant now = clock.instant();
+ Duration timeLeft = Duration.between(now, deadline);
+
+ if (timeLeft.minus(minTimeForNextRecord).isNegative())
+ {
+ log.info(
+ "{} - Aborting record handling, because only {} are left until the poll-interval expires!",
+ id,
+ timeLeft);
+ abortCurrentPoll = true;
+ consumer.seek(topicPartition, record.offset());
+ break;
+ }
+
+ try
+ {
+ handleRecord(
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ record.key(),
+ record.value());
+ }
+ catch (RetriableErrorException e)
+ {
+ // Seeking to the offset of the record, that raised the exception, and
+ // leaving the loop afterwards, retries the record
+ int partition = topicPartition.partition();
+ if (!backOffState[partition].isRetryInProgress(record.offset()))
+ {
+ backOffState[partition] = new BackOffState(topicPartition, record.offset());
+ partitionHasRetryableError = true;
+ consumer.seek(topicPartition, record.offset());
+ break;
+ }
+ else
+ {
+ if (backOffState[partition].isUnsuccessful())
+ {
+ log.warn(
+ "{} - Ignoring retryable error after {} attempts and {}",
+ id,
+ backOffState[partition].getNumRetries(),
+ backOffState[partition].getDurationSpendRetrying());
+ }
+ else
+ {
+ consumer.seek(topicPartition, record.offset());
+ partitionHasRetryableError = true;
+ break;
+ }
+ }
+ }
+ catch (NonRetriableErrorException e)
+ {
+ // Just ignore, to skip
+ log.warn("{} - Ignoring non-retryable error!", id, e);
+ }
+
+ if (!partitionHasRetryableError)
+ {
+ backOffState[topicPartition.partition()].markRetryAsSuccessful();
+ }
+ }
}
}
catch(RecordDeserializationException e)
}
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(tp -> backOffState[tp.partition()] = new BackOffState());
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ }
+
+
public void shutdown() throws InterruptedException
{
log.info("{} joining the worker-thread...", id);
consumer.wakeup();
workerThread.join();
}
+
+
+ private class BackOffState
+ {
+ private final TopicPartition topicPartition;
+ private final long offset;
+ private final Instant startTime;
+
+ private BackOffExecution backOffExecution;
+ private int numRetries = 0;
+ private Instant timeNextRetryIsDue;
+
+
+ BackOffState()
+ {
+ topicPartition = null;
+ offset = 0;
+ startTime = null;
+ }
+
+ BackOffState(TopicPartition topicPartition, long offset)
+ {
+ this.topicPartition = topicPartition;
+ this.offset = offset;
+ this.startTime = clock.instant();
+
+ log.debug("{} - Back-Off requested for {} at {}", id, topicPartition, startTime);
+ backOffExecution = backOffStrategy.start();
+ initializeNextBackOff();
+ }
+
+ boolean isWaitingForNextRetry()
+ {
+ if (backOffExecution == null)
+ {
+ return false;
+ }
+
+ if (clock.instant().isAfter(timeNextRetryIsDue))
+ {
+ numRetries++;
+ initializeNextBackOff();
+ log.info("{} - Retrying {}", id, topicPartition);
+ return false;
+ }
+ else
+ {
+ log.info("{} - Next retry for {} is due at {}", id, topicPartition, timeNextRetryIsDue);
+ return true;
+ }
+ }
+
+ boolean isRetryInProgress(long offset)
+ {
+ return this.offset == offset && timeNextRetryIsDue != null;
+ }
+
+ boolean isUnsuccessful()
+ {
+ return backOffExecution == null && timeNextRetryIsDue != null;
+ }
+
+ int getNumRetries()
+ {
+ return numRetries;
+ }
+
+ Duration getDurationSpendRetrying()
+ {
+ return Duration.between(startTime, timeNextRetryIsDue);
+ }
+
+ void markRetryAsSuccessful()
+ {
+ if (backOffExecution != null)
+ {
+ log.info(
+ "{} - retry #{} for {} succeeded after {}",
+ id,
+ numRetries,
+ topicPartition,
+ Duration.between(startTime, clock.instant()));
+
+ backOffExecution = null;
+ timeNextRetryIsDue = null;
+ }
+ }
+
+ private void initializeNextBackOff()
+ {
+ long backOffMillis = backOffExecution.nextBackOff();
+
+ if (backOffMillis == BackOffExecution.STOP)
+ {
+ backOffExecution = null;
+ }
+ else
+ {
+ timeNextRetryIsDue = clock.instant().plusMillis(backOffMillis);
+ }
+ }
+ }
}