package de.juplo.kafka;
import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.TopicPartition;
import org.springframework.util.backoff.BackOff;
import java.time.Instant;
+@RequiredArgsConstructor
@Slf4j
class BackOffState
{
private final String id;
private final Clock clock;
private final TopicPartition topicPartition;
- @Getter
- private final long offset;
+ private final BackOff backOffStrategy;
+ @Getter
+ private long offset;
private BackOffExecution backOffExecution;
private int numRetries = 0;
private Instant timeNextRetryIsDue;
- BackOffState()
- {
- id = "NONE";
- clock = null;
- topicPartition = null;
- offset = -1;
- }
-
- BackOffState(String id, BackOff backOffStrategy, Clock clock, TopicPartition topicPartition, long offset)
+ void start(long offset)
{
- this.id = id;
- this.clock = clock;
- this.topicPartition = topicPartition;
this.offset = offset;
-
log.info(
"{} - Back-Off requested for offset={} in {}",
id,
void reset()
{
timeNextRetryIsDue = null;
+ offset = -1l;
}
private void initializeNextBackOff()
if (!backOffState[partition].isStarted(record.offset()))
{
log.info("{} - First occurrence of a retryable error: {}", id, e.toString());
- backOffState[partition] = new BackOffState(id, backOffStrategy, clock, topicPartition, record.offset());
+ backOffState[partition].start(record.offset());
consumer.seek(topicPartition, record.offset());
break;
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions)
{
- partitions.forEach(tp -> backOffState[tp.partition()] = new BackOffState());
+ partitions.forEach(topicPartition ->
+ backOffState[topicPartition.partition()] = new BackOffState(
+ id,
+ clock,
+ topicPartition,
+ backOffStrategy));
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions)
{
+ partitions.forEach(tp -> backOffState[tp.partition()] = null);
}
private BackOffState NotStartedBackOffState()
{
- return new BackOffState();
+ // GIVEN
+
+ // WHEN
+ BackOffState backOffState = new BackOffState(ID, clock, TOPIC_PARTITION, backOff);
+
+ return backOffState;
}
@Test
given(backOff.start()).willReturn(backOffExecution);
given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP);
- return new BackOffState(ID, backOff, clock, TOPIC_PARTITION, OFFSET);
+ // WHEN
+ BackOffState backOffState = new BackOffState(ID, clock, TOPIC_PARTITION, backOff);
+ backOffState.start(OFFSET);
+
+ return backOffState;
}
@Test
private BackOffState StartedBackoffStateWithRetries()
{
+ // GIVEN
given(clock.instant()).willReturn(NOW);
given(backOff.start()).willReturn(backOffExecution);
given(backOffExecution.nextBackOff()).willReturn(BACK_OFF);
- return new BackOffState(ID, backOff, clock, TOPIC_PARTITION, OFFSET);
+ // WHEN
+ BackOffState backOffState = new BackOffState(ID, clock, TOPIC_PARTITION, backOff);
+ backOffState.start(OFFSET);
+
+ return backOffState;
}
@Test
}
@Test
- @DisplayName("A started BackOffState is started if it is marked as completed")
- void StartedBackOffStateIsStartedIfMarkedCompleted()
+ @DisplayName("A started BackOffState is not started after a reset")
+ void StartedBackOffStateIsNotStartedAfterReset()
{
// GIVEN
BackOffState backOffState = StartedBackoffStateWithRetries();
backOffState.reset();
// THEN
- assertThat(backOffState.isStarted(OFFSET)).isTrue();
+ assertThat(backOffState.isStarted(OFFSET)).isFalse();
}
}