import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.TopicPartition;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.BackOffExecution;
@Slf4j
class BackOffState
{
- private final String id;
+ private final String logPrefix;
private final Clock clock;
- private final TopicPartition topicPartition;
private final BackOff backOffStrategy;
@Getter
void start(long offset)
{
this.offset = offset;
- log.info(
- "{} - Back-Off requested for offset={} in {}",
- id,
- offset,
- topicPartition);
+ log.info("{} - Back-Off requested for offset={}", logPrefix, offset);
backOffExecution = backOffStrategy.start();
initializeNextBackOff();
}
if (remaining.isNegative())
{
log.info(
- "{} - {}. retry for offset={} in {}, lateness: {}",
- id,
+ "{} - {}. retry for offset={}, lateness: {}",
+ logPrefix,
numRetries,
offset,
- topicPartition,
remaining.abs());
initializeNextBackOff();
return false;
else
{
log.info(
- "{} - Next retry for offset={} in {} is due in {}",
- id,
+ "{} - Next retry for offset={} is due in {}",
+ logPrefix,
offset,
- topicPartition,
remaining);
return true;
}
package de.juplo.kafka;
-import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
class BackOffStateTest
{
final static String ID = "TEST";
- final static TopicPartition TOPIC_PARTITION = new TopicPartition("test", 0);
final static long OFFSET = 666;
final static Instant NOW = Instant.now();
final static long BACK_OFF = 1000l;
// GIVEN
// WHEN
- BackOffState backOffState = new BackOffState(ID, clock, TOPIC_PARTITION, backOff);
+ BackOffState backOffState = new BackOffState(ID, clock, backOff);
return backOffState;
}
given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP);
// WHEN
- BackOffState backOffState = new BackOffState(ID, clock, TOPIC_PARTITION, backOff);
+ BackOffState backOffState = new BackOffState(ID, clock, backOff);
backOffState.start(OFFSET);
return backOffState;
given(backOffExecution.nextBackOff()).willReturn(BACK_OFF);
// WHEN
- BackOffState backOffState = new BackOffState(ID, clock, TOPIC_PARTITION, backOff);
+ BackOffState backOffState = new BackOffState(ID, clock, backOff);
backOffState.start(OFFSET);
return backOffState;