@Autowired
TestRecordHandler recordHandler;
@Autowired
+ DeadLetterTopicConsumer deadLetterTopicConsumer;
+ @Autowired
EndlessConsumer endlessConsumer;
KafkaProducer<Bytes, Bytes> testRecordProducer;
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(1))
.until(() -> recordHandler.receivedMessages >= numberOfValidMessages);
+ await(recordGenerator.getNumberOfPoisonPills() + " poison-pills received")
+ .atMost(Duration.ofSeconds(30))
+ .pollInterval(Duration.ofSeconds(1))
+ .until(() -> deadLetterTopicConsumer.messages.size() == recordGenerator.getNumberOfPoisonPills());
await("Offsets committed")
.atMost(Duration.ofSeconds(10))
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(1))
.until(() -> recordHandler.receivedMessages >= numberOfValidMessages);
+ await(recordGenerator.getNumberOfLogicErrors() + " logic-errors received")
+ .atMost(Duration.ofSeconds(30))
+ .pollInterval(Duration.ofSeconds(1))
+ .until(() -> deadLetterTopicConsumer.messages.size() == recordGenerator.getNumberOfLogicErrors());
await("Offsets committed")
.atMost(Duration.ofSeconds(10))
recordHandler.seenOffsets = new HashMap<>();
recordHandler.receivedMessages = 0;
+ deadLetterTopicConsumer.messages.clear();
+
doForCurrentOffsets((tp, offset) ->
{
oldOffsets.put(tp, offset - 1);