Der Test prüft die Anzahl der Einträge im DLT
authorKai Moritz <kai@juplo.de>
Sun, 11 Sep 2022 13:24:27 +0000 (15:24 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 11 Sep 2022 19:13:21 +0000 (21:13 +0200)
* Der Test schlägt fehl, weil die Überprüfung aufdeckt, dass der
  `DeadLetterTopicConsumer wegen einer Fehlkonfiguration nicht in der
  Lage ist, die Poistion-Pill-Nachrichten einzulesen.
* Dies liegt daran, dass er die Consumer-Konfiguration der Anwendung
  verwendet und deswegen auch genau wie diese am Deserialisieren dieser
  Nachrichten scheitert.

src/test/java/de/juplo/kafka/GenericApplicationTests.java

index d65dd8e..003a178 100644 (file)
@@ -70,6 +70,8 @@ abstract class GenericApplicationTests<K, V>
        @Autowired
        TestRecordHandler recordHandler;
        @Autowired
+       DeadLetterTopicConsumer deadLetterTopicConsumer;
+       @Autowired
        EndlessConsumer endlessConsumer;
 
        KafkaProducer<Bytes, Bytes> testRecordProducer;
@@ -132,6 +134,10 @@ abstract class GenericApplicationTests<K, V>
                                .atMost(Duration.ofSeconds(30))
                                .pollInterval(Duration.ofSeconds(1))
                                .until(() -> recordHandler.receivedMessages >= numberOfValidMessages);
+               await(recordGenerator.getNumberOfPoisonPills() + " poison-pills received")
+                               .atMost(Duration.ofSeconds(30))
+                               .pollInterval(Duration.ofSeconds(1))
+                               .until(() -> deadLetterTopicConsumer.messages.size() == recordGenerator.getNumberOfPoisonPills());
 
                await("Offsets committed")
                                .atMost(Duration.ofSeconds(10))
@@ -164,6 +170,10 @@ abstract class GenericApplicationTests<K, V>
                                .atMost(Duration.ofSeconds(30))
                                .pollInterval(Duration.ofSeconds(1))
                                .until(() -> recordHandler.receivedMessages >= numberOfValidMessages);
+               await(recordGenerator.getNumberOfLogicErrors() + " logic-errors received")
+                               .atMost(Duration.ofSeconds(30))
+                               .pollInterval(Duration.ofSeconds(1))
+                               .until(() -> deadLetterTopicConsumer.messages.size() == recordGenerator.getNumberOfLogicErrors());
 
                await("Offsets committed")
                                .atMost(Duration.ofSeconds(10))
@@ -348,6 +358,8 @@ abstract class GenericApplicationTests<K, V>
                recordHandler.seenOffsets = new HashMap<>();
                recordHandler.receivedMessages = 0;
 
+               deadLetterTopicConsumer.messages.clear();
+
                doForCurrentOffsets((tp, offset) ->
                {
                        oldOffsets.put(tp, offset - 1);