Der Test prüft die Anzahl der Einträge im DLT
[demos/kafka/training] / src / test / java / de / juplo / kafka / GenericApplicationTests.java
index d65dd8e..003a178 100644 (file)
@@ -70,6 +70,8 @@ abstract class GenericApplicationTests<K, V>
        @Autowired
        TestRecordHandler recordHandler;
        @Autowired
+       DeadLetterTopicConsumer deadLetterTopicConsumer;
+       @Autowired
        EndlessConsumer endlessConsumer;
 
        KafkaProducer<Bytes, Bytes> testRecordProducer;
@@ -132,6 +134,10 @@ abstract class GenericApplicationTests<K, V>
                                .atMost(Duration.ofSeconds(30))
                                .pollInterval(Duration.ofSeconds(1))
                                .until(() -> recordHandler.receivedMessages >= numberOfValidMessages);
+               await(recordGenerator.getNumberOfPoisonPills() + " poison-pills received")
+                               .atMost(Duration.ofSeconds(30))
+                               .pollInterval(Duration.ofSeconds(1))
+                               .until(() -> deadLetterTopicConsumer.messages.size() == recordGenerator.getNumberOfPoisonPills());
 
                await("Offsets committed")
                                .atMost(Duration.ofSeconds(10))
@@ -164,6 +170,10 @@ abstract class GenericApplicationTests<K, V>
                                .atMost(Duration.ofSeconds(30))
                                .pollInterval(Duration.ofSeconds(1))
                                .until(() -> recordHandler.receivedMessages >= numberOfValidMessages);
+               await(recordGenerator.getNumberOfLogicErrors() + " logic-errors received")
+                               .atMost(Duration.ofSeconds(30))
+                               .pollInterval(Duration.ofSeconds(1))
+                               .until(() -> deadLetterTopicConsumer.messages.size() == recordGenerator.getNumberOfLogicErrors());
 
                await("Offsets committed")
                                .atMost(Duration.ofSeconds(10))
@@ -348,6 +358,8 @@ abstract class GenericApplicationTests<K, V>
                recordHandler.seenOffsets = new HashMap<>();
                recordHandler.receivedMessages = 0;
 
+               deadLetterTopicConsumer.messages.clear();
+
                doForCurrentOffsets((tp, offset) ->
                {
                        oldOffsets.put(tp, offset - 1);