--- /dev/null
+package de.juplo.kafka;
+
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.util.backoff.BackOff;
+import org.springframework.util.backoff.BackOffExecution;
+
+import java.time.Clock;
+import java.time.Instant;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.BDDMockito.given;
+
+
+@ExtendWith(MockitoExtension.class)
+class BackOffStateTest
+{
+ final static String ID = "TEST";
+ final static TopicPartition TOPIC_PARTITION = new TopicPartition("test", 0);
+ final static long OFFSET = 666;
+ final static long OTHER_OFFSET = 1;
+ final static Instant NOW = Instant.now();
+
+
+ @Test
+ @DisplayName("A not started BackOffState is not waiting for a retry")
+ void NotStartedBackOffStateIsNotWaitingForRetry()
+ {
+ // GIVEN
+ BackOffState backOffState = new BackOffState();
+
+ // WHEN
+
+ // THEN
+ assertThat(backOffState.isWaitingForNextRetry()).isFalse();
+ }
+
+ @Test
+ @DisplayName("A not started BackOffState is not started")
+ void NotStartedBackOffStateIsNotStarted()
+ {
+ // GIVEN
+ BackOffState backOffState = new BackOffState();
+
+ // WHEN
+
+ // THEN
+ assertThat(backOffState.isStarted(OFFSET)).isFalse();
+ }
+
+ @Test
+ @DisplayName("A not started BackOffState is not completed")
+ void NotStartedBackOffStateIsNotCompleted()
+ {
+ // GIVEN
+ BackOffState backOffState = new BackOffState();
+
+ // WHEN
+
+ // THEN
+ assertThat(backOffState.isCompleted()).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState with no retries is not waiting for a retry")
+ void StartedBackOffStateWithNoRetriesIsNotWaitingForRetry(
+ @Mock Clock clock,
+ @Mock BackOff backOff,
+ @Mock BackOffExecution backOffExecution)
+ {
+ // GIVEN
+ given(clock.instant()).willReturn(NOW);
+ given(backOff.start()).willReturn(backOffExecution);
+ given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP);
+
+ // WHEN
+ BackOffState backOffState = new BackOffState(ID, backOff, clock, TOPIC_PARTITION, OFFSET);
+
+ // THEN
+ assertThat(backOffState.isWaitingForNextRetry()).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState with no retries is started")
+ void StartedBackOffStateWithNoRetriesIsStarted(
+ @Mock Clock clock,
+ @Mock BackOff backOff,
+ @Mock BackOffExecution backOffExecution)
+ {
+ // GIVEN
+ given(clock.instant()).willReturn(NOW);
+ given(backOff.start()).willReturn(backOffExecution);
+ given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP);
+
+ // WHEN
+ BackOffState backOffState = new BackOffState(ID, backOff, clock, TOPIC_PARTITION, OFFSET);
+
+ // THEN
+ assertThat(backOffState.isStarted(OFFSET)).isTrue();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState with no retries is completed")
+ void StartedBackOffStateWithNoRetriesIsCompleted(
+ @Mock Clock clock,
+ @Mock BackOff backOff,
+ @Mock BackOffExecution backOffExecution)
+ {
+ // GIVEN
+ given(clock.instant()).willReturn(NOW);
+ given(backOff.start()).willReturn(backOffExecution);
+ given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP);
+
+ // WHEN
+ BackOffState backOffState = new BackOffState(ID, backOff, clock, TOPIC_PARTITION, OFFSET);
+
+ // THEN
+ assertThat(backOffState.isCompleted()).isTrue();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is waiting for a retry if the time is not due")
+ void StartedBackOffStateIsWaitingForRetryIfTimeIsNotDue(
+ @Mock Clock clock,
+ @Mock BackOff backOff,
+ @Mock BackOffExecution backOffExecution)
+ {
+ // GIVEN
+ given(clock.instant()).willReturn(NOW);
+ given(backOff.start()).willReturn(backOffExecution);
+ given(backOffExecution.nextBackOff()).willReturn(1000l);
+
+ BackOffState backOffState = new BackOffState(ID, backOff, clock, TOPIC_PARTITION, OFFSET);
+
+ given(clock.instant()).willReturn(NOW.plusMillis(1000));
+
+ // WHEN
+ boolean result = backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(result).isTrue();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is started if the time is not due")
+ void StartedBackOffStateIsStarted(
+ @Mock Clock clock,
+ @Mock BackOff backOff,
+ @Mock BackOffExecution backOffExecution)
+ {
+ // GIVEN
+ given(clock.instant()).willReturn(NOW);
+ given(backOff.start()).willReturn(backOffExecution);
+ given(backOffExecution.nextBackOff()).willReturn(1000l);
+
+ BackOffState backOffState = new BackOffState(ID, backOff, clock, TOPIC_PARTITION, OFFSET);
+
+ given(clock.instant()).willReturn(NOW.plusMillis(1000));
+
+ // WHEN
+ backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(backOffState.isStarted(OFFSET)).isTrue();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is not started for other offsets if the time is not due")
+ void StartedBackOffStateIsNotStartedForOtherOffsetsIfTheTimeIsNotDue(
+ @Mock Clock clock,
+ @Mock BackOff backOff,
+ @Mock BackOffExecution backOffExecution)
+ {
+ // GIVEN
+ given(clock.instant()).willReturn(NOW);
+ given(backOff.start()).willReturn(backOffExecution);
+ given(backOffExecution.nextBackOff()).willReturn(1000l);
+
+ BackOffState backOffState = new BackOffState(ID, backOff, clock, TOPIC_PARTITION, OFFSET);
+
+ given(clock.instant()).willReturn(NOW.plusMillis(1000));
+
+ // WHEN
+ backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(backOffState.isStarted(OTHER_OFFSET)).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is not completed if the time is not due")
+ void StartedBackOffStateIsNotCompletedIfTimeIsNotDue(
+ @Mock Clock clock,
+ @Mock BackOff backOff,
+ @Mock BackOffExecution backOffExecution)
+ {
+ // GIVEN
+ given(clock.instant()).willReturn(NOW);
+ given(backOff.start()).willReturn(backOffExecution);
+ given(backOffExecution.nextBackOff()).willReturn(1000l);
+
+ BackOffState backOffState = new BackOffState(ID, backOff, clock, TOPIC_PARTITION, OFFSET);
+
+ given(clock.instant()).willReturn(NOW.plusMillis(1000));
+
+ // WHEN
+ backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(backOffState.isCompleted()).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is not waiting for a retry if the time is due but the retry not yet completed")
+ void StartedBackOffStateIsNotWaitingForRetryIfTheTimeIsDueButRetryNotCompleted(
+ @Mock Clock clock,
+ @Mock BackOff backOff,
+ @Mock BackOffExecution backOffExecution)
+ {
+ // GIVEN
+ given(clock.instant()).willReturn(NOW);
+ given(backOff.start()).willReturn(backOffExecution);
+ given(backOffExecution.nextBackOff()).willReturn(1000l);
+
+ BackOffState backOffState = new BackOffState(ID, backOff, clock, TOPIC_PARTITION, OFFSET);
+
+ given(clock.instant()).willReturn(NOW.plusMillis(1001));
+ given(backOffExecution.nextBackOff()).willReturn(1000l);
+
+ // WHEN
+ boolean result = backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(result).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is started if the time is due but the retry not yet completed")
+ void StartedBackOffStateIsStartedIfTheTimeIsDueButRetryNotCompleted(
+ @Mock Clock clock,
+ @Mock BackOff backOff,
+ @Mock BackOffExecution backOffExecution)
+ {
+ // GIVEN
+ given(clock.instant()).willReturn(NOW);
+ given(backOff.start()).willReturn(backOffExecution);
+ given(backOffExecution.nextBackOff()).willReturn(1000l);
+
+ BackOffState backOffState = new BackOffState(ID, backOff, clock, TOPIC_PARTITION, OFFSET);
+
+ given(clock.instant()).willReturn(NOW.plusMillis(1001));
+ given(backOffExecution.nextBackOff()).willReturn(1000l);
+
+ // WHEN
+ backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(backOffState.isStarted(OFFSET)).isTrue();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is not started for other offsets if the time is due but the retry not yet completed")
+ void StartedBackOffStateIsNotStartedForOtherOffsetsIfTheTimeIsDueButRetryNotCompleted(
+ @Mock Clock clock,
+ @Mock BackOff backOff,
+ @Mock BackOffExecution backOffExecution)
+ {
+ // GIVEN
+ given(clock.instant()).willReturn(NOW);
+ given(backOff.start()).willReturn(backOffExecution);
+ given(backOffExecution.nextBackOff()).willReturn(1000l);
+
+ BackOffState backOffState = new BackOffState(ID, backOff, clock, TOPIC_PARTITION, OFFSET);
+
+ given(clock.instant()).willReturn(NOW.plusMillis(1001));
+ given(backOffExecution.nextBackOff()).willReturn(1000l);
+
+ // WHEN
+ backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(backOffState.isStarted(OTHER_OFFSET)).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is not completed if the time is due but the retry not yet completed")
+ void StartedBackOffStateIsNotCompletedIfTheTimeIsDueButRetryNotCompleted(
+ @Mock Clock clock,
+ @Mock BackOff backOff,
+ @Mock BackOffExecution backOffExecution)
+ {
+ // GIVEN
+ given(clock.instant()).willReturn(NOW);
+ given(backOff.start()).willReturn(backOffExecution);
+ given(backOffExecution.nextBackOff()).willReturn(1000l);
+
+ BackOffState backOffState = new BackOffState(ID, backOff, clock, TOPIC_PARTITION, OFFSET);
+
+ given(clock.instant()).willReturn(NOW.plusMillis(1001));
+ given(backOffExecution.nextBackOff()).willReturn(1000l);
+
+ // WHEN
+ backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(backOffState.isCompleted()).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is not waiting for a retry if the time is due and the retry is completed")
+ void StartedBackOffStateIsNotWaitingForRetryIfTheTimeIsDueAndRetryIsCompleted(
+ @Mock Clock clock,
+ @Mock BackOff backOff,
+ @Mock BackOffExecution backOffExecution)
+ {
+ // GIVEN
+ given(clock.instant()).willReturn(NOW);
+ given(backOff.start()).willReturn(backOffExecution);
+ given(backOffExecution.nextBackOff()).willReturn(1000l);
+
+ BackOffState backOffState = new BackOffState(ID, backOff, clock, TOPIC_PARTITION, OFFSET);
+
+ given(clock.instant()).willReturn(NOW.plusMillis(1001));
+ given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP);
+
+ // WHEN
+ boolean result = backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(result).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is started if the time is due and the retry is completed")
+ void StartedBackOffStateIsStartedIfTheTimeIsDueAndRetryIsCompleted(
+ @Mock Clock clock,
+ @Mock BackOff backOff,
+ @Mock BackOffExecution backOffExecution)
+ {
+ // GIVEN
+ given(clock.instant()).willReturn(NOW);
+ given(backOff.start()).willReturn(backOffExecution);
+ given(backOffExecution.nextBackOff()).willReturn(1000l);
+
+ BackOffState backOffState = new BackOffState(ID, backOff, clock, TOPIC_PARTITION, OFFSET);
+
+ given(clock.instant()).willReturn(NOW.plusMillis(1001));
+ given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP);
+
+ // WHEN
+ backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(backOffState.isStarted(OFFSET)).isTrue();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is not started for other offsets if the time is due and the retry is completed")
+ void StartedBackOffStateIsNotStartedForOtherOffsetsIfTheTimeIsDueAndRetryIsCompleted(
+ @Mock Clock clock,
+ @Mock BackOff backOff,
+ @Mock BackOffExecution backOffExecution)
+ {
+ // GIVEN
+ given(clock.instant()).willReturn(NOW);
+ given(backOff.start()).willReturn(backOffExecution);
+ given(backOffExecution.nextBackOff()).willReturn(1000l);
+
+ BackOffState backOffState = new BackOffState(ID, backOff, clock, TOPIC_PARTITION, OFFSET);
+
+ given(clock.instant()).willReturn(NOW.plusMillis(1001));
+ given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP);
+
+ // WHEN
+ backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(backOffState.isStarted(OTHER_OFFSET)).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is completed if the time is due and the retry is completed")
+ void StartedBackOffStateIsCompletedIfTheTimeIsDueAndRetryIsCompleted(
+ @Mock Clock clock,
+ @Mock BackOff backOff,
+ @Mock BackOffExecution backOffExecution)
+ {
+ // GIVEN
+ given(clock.instant()).willReturn(NOW);
+ given(backOff.start()).willReturn(backOffExecution);
+ given(backOffExecution.nextBackOff()).willReturn(1000l);
+
+ BackOffState backOffState = new BackOffState(ID, backOff, clock, TOPIC_PARTITION, OFFSET);
+
+ given(clock.instant()).willReturn(NOW.plusMillis(1001));
+ given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP);
+
+ // WHEN
+ backOffState.isWaitingForNextRetry();
+
+ // THEN
+ assertThat(backOffState.isCompleted()).isTrue();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is not waiting for a retry if it is marked as completed")
+ void StartedBackOffStateIsNotWaitingForRetryIfMarkedCompleted(
+ @Mock Clock clock,
+ @Mock BackOff backOff,
+ @Mock BackOffExecution backOffExecution)
+ {
+ // GIVEN
+ given(clock.instant()).willReturn(NOW);
+ given(backOff.start()).willReturn(backOffExecution);
+ given(backOffExecution.nextBackOff()).willReturn(1000l);
+
+ BackOffState backOffState = new BackOffState(ID, backOff, clock, TOPIC_PARTITION, OFFSET);
+
+ // WHEN
+ backOffState.markAsCompleted();
+
+ // THEN
+ assertThat(backOffState.isWaitingForNextRetry()).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is not started if it is marked as completed")
+ void StartedBackOffStateIsNotStartedIfMarkedCompleted(
+ @Mock Clock clock,
+ @Mock BackOff backOff,
+ @Mock BackOffExecution backOffExecution)
+ {
+ // GIVEN
+ given(clock.instant()).willReturn(NOW);
+ given(backOff.start()).willReturn(backOffExecution);
+ given(backOffExecution.nextBackOff()).willReturn(1000l);
+
+ BackOffState backOffState = new BackOffState(ID, backOff, clock, TOPIC_PARTITION, OFFSET);
+
+ // WHEN
+ backOffState.markAsCompleted();
+
+ // THEN
+ assertThat(backOffState.isStarted(OFFSET)).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is not started for other offsets if it is marked as completed")
+ void StartedBackOffStateIsNotStartedForOtherOffsetsIfMarkedCompleted(
+ @Mock Clock clock,
+ @Mock BackOff backOff,
+ @Mock BackOffExecution backOffExecution)
+ {
+ // GIVEN
+ given(clock.instant()).willReturn(NOW);
+ given(backOff.start()).willReturn(backOffExecution);
+ given(backOffExecution.nextBackOff()).willReturn(1000l);
+
+ BackOffState backOffState = new BackOffState(ID, backOff, clock, TOPIC_PARTITION, OFFSET);
+
+ // WHEN
+ backOffState.markAsCompleted();
+
+ // THEN
+ assertThat(backOffState.isStarted(OTHER_OFFSET)).isFalse();
+ }
+
+ @Test
+ @DisplayName("A started BackOffState is not completed if it is marked as completed")
+ void StartedBackOffStateIsNotCompletedIfMarkedCompleted(
+ @Mock Clock clock,
+ @Mock BackOff backOff,
+ @Mock BackOffExecution backOffExecution)
+ {
+ // GIVEN
+ given(clock.instant()).willReturn(NOW);
+ given(backOff.start()).willReturn(backOffExecution);
+ given(backOffExecution.nextBackOff()).willReturn(1000l);
+
+ BackOffState backOffState = new BackOffState(ID, backOff, clock, TOPIC_PARTITION, OFFSET);
+
+ // WHEN
+ backOffState.markAsCompleted();
+
+ // THEN
+ assertThat(backOffState.isCompleted()).isFalse(); // ??
+ }
+}