import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;
import java.time.Clock;
@Test
void testOnlyValidMessages()
{
+ createExampleConsumer();
+
sendValidMessage(0);
sendValidMessage(1);
sendValidMessage(2);
@Test
void testDeserializationException()
{
+ createExampleConsumer();
+
sendValidMessage(0);
sendValidMessage(1);
sendValidMessage(2);
@Test
void testUnexpectedDomainError() throws Exception
{
+ createExampleConsumer();
+
sendValidMessage(0);
sendValidMessage(1);
sendValidMessage(2);
@Test
void testNonRetryableDomainError() throws Exception
{
+ createExampleConsumer();
+
sendValidMessage(0);
sendValidMessage(1);
sendValidMessage(2);
@ValueSource(ints = { 1, 2, 3, 4, 5, 6 })
void testOneMessageCausesRetryableDomainErrors(int numFailures)
{
+ createExampleConsumer();
+
sendValidMessage(0);
sendValidMessage(1);
sendValidMessage(2);
@Test
void testOneMessageCausesRetryableDomainErrors()
{
+ createExampleConsumer();
+
sendValidMessage(0);
sendValidMessage(1);
sendValidMessage(2);
int numFailuresForMessageB,
int numFailuresForMessageC)
{
+ createExampleConsumer();
+
sendValidMessage(0);
sendValidMessage(1);
sendValidMessage(2);
int numFailuresForMessageB,
int numFailuresForMessageC)
{
+ createExampleConsumer();
+
sendValidMessage(0);
sendValidMessage(1);
sendValidMessage(2);
@Autowired
KafkaTemplate<String, byte[]> kafkaTemplate;
+ @Autowired ApplicationProperties properties;
+ @Autowired Clock clock;
final LongSerializer serializer = new LongSerializer();
final long[] currentOffsets = new long[NUM_PARTITIONS];
ExampleConsumer exampleConsumer;
- @BeforeEach
- void createExampleConsumer(
- @Autowired ApplicationProperties properties,
- @Autowired Clock clock)
+ void createExampleConsumer()
+ {
+ createExampleConsumer(new FixedBackOff(0l, properties.getConsumerProperties().getNumRetries()));
+ }
+
+ void createExampleConsumer(BackOff backOff)
{
ApplicationConfiguration configuration = new ApplicationConfiguration();
Consumer<String, Long> consumer = configuration.kafkaConsumer(properties);
properties.getConsumerProperties().getPollRequestTimeout(),
properties.getConsumerProperties().getMaxPollInterval(),
properties.getConsumerProperties().getMaxTimePerRecord(),
- new FixedBackOff(0l, properties.getConsumerProperties().getNumRetries()),
+ backOff,
() -> isTerminatedExceptionally.set(true));
}