From: Kai Moritz Date: Sun, 11 Sep 2022 13:24:27 +0000 (+0200) Subject: Der Test prüft die Anzahl der Einträge im DLT X-Git-Tag: sumup-adder--springified---lvm-2-tage~3 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=803f73e0c0d6bf65e5887e7bfa5c45d8a5c29abc;p=demos%2Fkafka%2Ftraining Der Test prüft die Anzahl der Einträge im DLT * Der Test schlägt fehl, weil die Überprüfung aufdeckt, dass der `DeadLetterTopicConsumer wegen einer Fehlkonfiguration nicht in der Lage ist, die Poistion-Pill-Nachrichten einzulesen. * Dies liegt daran, dass er die Consumer-Konfiguration der Anwendung verwendet und deswegen auch genau wie diese am Deserialisieren dieser Nachrichten scheitert. --- diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index d65dd8e..003a178 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -70,6 +70,8 @@ abstract class GenericApplicationTests @Autowired TestRecordHandler recordHandler; @Autowired + DeadLetterTopicConsumer deadLetterTopicConsumer; + @Autowired EndlessConsumer endlessConsumer; KafkaProducer testRecordProducer; @@ -132,6 +134,10 @@ abstract class GenericApplicationTests .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) .until(() -> recordHandler.receivedMessages >= numberOfValidMessages); + await(recordGenerator.getNumberOfPoisonPills() + " poison-pills received") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .until(() -> deadLetterTopicConsumer.messages.size() == recordGenerator.getNumberOfPoisonPills()); await("Offsets committed") .atMost(Duration.ofSeconds(10)) @@ -164,6 +170,10 @@ abstract class GenericApplicationTests .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) .until(() -> recordHandler.receivedMessages >= numberOfValidMessages); + await(recordGenerator.getNumberOfLogicErrors() + " logic-errors received") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .until(() -> deadLetterTopicConsumer.messages.size() == recordGenerator.getNumberOfLogicErrors()); await("Offsets committed") .atMost(Duration.ofSeconds(10)) @@ -348,6 +358,8 @@ abstract class GenericApplicationTests recordHandler.seenOffsets = new HashMap<>(); recordHandler.receivedMessages = 0; + deadLetterTopicConsumer.messages.clear(); + doForCurrentOffsets((tp, offset) -> { oldOffsets.put(tp, offset - 1);