import java.util.stream.IntStream;
import static de.juplo.kafka.ExampleConsumerTest.*;
+import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest(
Awaitility
.await("All messages are consumed")
.atMost(Duration.ofSeconds(5))
- .until(() -> mockRecordHandler.getNumMessagesHandled() == 20);
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20));
}
@DisplayName("Delayed messages are consumed as expected")
Awaitility
.await("All messages are consumed")
.atMost(Duration.ofSeconds(5))
- .until(() -> mockRecordHandler.getNumMessagesHandled() == 20);
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20));
}
@DisplayName("A deserialization exception is skipped and all valid messages are consumed")
Awaitility
.await("All valid messages are consumed")
.atMost(Duration.ofSeconds(15))
- .until(() -> mockRecordHandler.getNumMessagesHandled() == 19);
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19));
}
@DisplayName("A message, that triggers an unexpected exception in the domain-logic, exits the application")
.await("The ConsumerRunnable is exited by an unexpected exception")
.atMost(Duration.ofSeconds(5))
.pollInterval(Duration.ofMillis(250))
- .until(() -> isTerminatedExceptionally.get());
+ .untilAsserted(() -> assertThat(isTerminatedExceptionally.get()).isTrue());
}
@DisplayName("A message, that triggers an non-retryable exception in the domain-logic, is skipped and all other messages are consumed")
Awaitility
.await("All other valid messages are consumed")
.atMost(Duration.ofSeconds(15))
- .until(() -> mockRecordHandler.getNumMessagesHandled() == 19);
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19));
}
@DisplayName("A message, that triggers a retryable exception in the domain-logic, is retried and all messages are eventually consumed")
Awaitility
.await("All messages are eventually consumed")
.atMost(Duration.ofSeconds(15))
- .until(() -> mockRecordHandler.getNumMessagesHandled() == 20);
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20));
}
@DisplayName("All messages on a partition are delayed and a message, that triggers a retryable exception in the domain-logic, is retried and all messages are eventually consumed")
Awaitility
.await("All messages are eventually consumed")
.atMost(Duration.ofSeconds(15))
- .until(() -> mockRecordHandler.getNumMessagesHandled() == 20);
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20));
}
@DisplayName("A message, that triggers a retryable exception in the domain-logic, but fails too often, is skipped and all other messages are eventually consumed")
Awaitility
.await("All other messages are eventually consumed")
.atMost(Duration.ofSeconds(15))
- .until(() -> mockRecordHandler.getNumMessagesHandled() == 19);
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19));
}
@DisplayName("Three messages, that trigger retryable exceptions in the domain-logic, are retried and all messages are eventually consumed")
Awaitility
.await("All messages are eventually consumed")
.atMost(Duration.ofSeconds(20))
- .until(() -> mockRecordHandler.getNumMessagesHandled() == 30);
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(30));
}
@DisplayName("Three messages, that trigger retryable exceptions in the domain-logic, are retried, but one of them fails too often and is skipped, still all other messages are eventually consumed")
Awaitility
.await("All other messages are eventually consumed")
.atMost(Duration.ofSeconds(20))
- .until(() -> mockRecordHandler.getNumMessagesHandled() == 29);
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(29));
}
@DisplayName("A message, that triggers a retryable exception in the domain-logic, is retried 3 times with a fixed back-of and all messages are eventually consumed")
Awaitility
.await("All messages are eventually consumed")
.atMost(Duration.ofSeconds(15))
- .until(() -> mockRecordHandler.getNumMessagesHandled() == 20);
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20));
}