package de.juplo.kafka;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
if (backOffState[topicPartition.partition()].isWaitingForNextRetry())
{
log.info("{} - {} is blocked, because it is waiting for a retry", id, topicPartition);
- consumer.seek(topicPartition, backOffState[topicPartition.partition()].offset);
+ consumer.seek(topicPartition, backOffState[topicPartition.partition()].getOffset());
continue;
}
id,
record.offset(),
partition);
- backOffState[partition] = new BackOffState(topicPartition, record.offset());
+ backOffState[partition] = new BackOffState(id, backOffStrategy, clock, topicPartition, record.offset());
consumer.seek(topicPartition, record.offset());
break;
}
}
- private class BackOffState
+ @Slf4j
+ static class BackOffState
{
+ private final String id;
+ private final Clock clock;
private final TopicPartition topicPartition;
+ @Getter
private final long offset;
private final Instant startTime;
BackOffState()
{
+ id = "NONE";
+ clock = null;
topicPartition = null;
offset = -1;
startTime = null;
}
- BackOffState(TopicPartition topicPartition, long offset)
+ BackOffState(String id, BackOff backOffStrategy, Clock clock, TopicPartition topicPartition, long offset)
{
+ this.id = id;
+ this.clock = clock;
this.topicPartition = topicPartition;
this.offset = offset;
this.startTime = clock.instant();