X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FGenericApplicationTests.java;fp=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FGenericApplicationTests.java;h=003a1780386cb5d139dee5a3154f56d7134f77f2;hb=803f73e0c0d6bf65e5887e7bfa5c45d8a5c29abc;hp=d65dd8e7af64635b5adb483d2c6d12aa02193871;hpb=e54f6036fecc54ea37001dae5f64d88502acfbe1;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index d65dd8e..003a178 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -70,6 +70,8 @@ abstract class GenericApplicationTests @Autowired TestRecordHandler recordHandler; @Autowired + DeadLetterTopicConsumer deadLetterTopicConsumer; + @Autowired EndlessConsumer endlessConsumer; KafkaProducer testRecordProducer; @@ -132,6 +134,10 @@ abstract class GenericApplicationTests .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) .until(() -> recordHandler.receivedMessages >= numberOfValidMessages); + await(recordGenerator.getNumberOfPoisonPills() + " poison-pills received") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .until(() -> deadLetterTopicConsumer.messages.size() == recordGenerator.getNumberOfPoisonPills()); await("Offsets committed") .atMost(Duration.ofSeconds(10)) @@ -164,6 +170,10 @@ abstract class GenericApplicationTests .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) .until(() -> recordHandler.receivedMessages >= numberOfValidMessages); + await(recordGenerator.getNumberOfLogicErrors() + " logic-errors received") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .until(() -> deadLetterTopicConsumer.messages.size() == recordGenerator.getNumberOfLogicErrors()); await("Offsets committed") .atMost(Duration.ofSeconds(10)) @@ -348,6 +358,8 @@ abstract class GenericApplicationTests recordHandler.seenOffsets = new HashMap<>(); recordHandler.receivedMessages = 0; + deadLetterTopicConsumer.messages.clear(); + doForCurrentOffsets((tp, offset) -> { oldOffsets.put(tp, offset - 1);