From 803f73e0c0d6bf65e5887e7bfa5c45d8a5c29abc Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 11 Sep 2022 15:24:27 +0200 Subject: [PATCH] =?utf8?q?Der=20Test=20pr=C3=BCft=20die=20Anzahl=20der=20E?= =?utf8?q?intr=C3=A4ge=20im=20DLT?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Der Test schlägt fehl, weil die Überprüfung aufdeckt, dass der `DeadLetterTopicConsumer wegen einer Fehlkonfiguration nicht in der Lage ist, die Poistion-Pill-Nachrichten einzulesen. * Dies liegt daran, dass er die Consumer-Konfiguration der Anwendung verwendet und deswegen auch genau wie diese am Deserialisieren dieser Nachrichten scheitert. --- .../java/de/juplo/kafka/GenericApplicationTests.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index d65dd8e..003a178 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -70,6 +70,8 @@ abstract class GenericApplicationTests @Autowired TestRecordHandler recordHandler; @Autowired + DeadLetterTopicConsumer deadLetterTopicConsumer; + @Autowired EndlessConsumer endlessConsumer; KafkaProducer testRecordProducer; @@ -132,6 +134,10 @@ abstract class GenericApplicationTests .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) .until(() -> recordHandler.receivedMessages >= numberOfValidMessages); + await(recordGenerator.getNumberOfPoisonPills() + " poison-pills received") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .until(() -> deadLetterTopicConsumer.messages.size() == recordGenerator.getNumberOfPoisonPills()); await("Offsets committed") .atMost(Duration.ofSeconds(10)) @@ -164,6 +170,10 @@ abstract class GenericApplicationTests .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) .until(() -> recordHandler.receivedMessages >= numberOfValidMessages); + await(recordGenerator.getNumberOfLogicErrors() + " logic-errors received") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .until(() -> deadLetterTopicConsumer.messages.size() == recordGenerator.getNumberOfLogicErrors()); await("Offsets committed") .atMost(Duration.ofSeconds(10)) @@ -348,6 +358,8 @@ abstract class GenericApplicationTests recordHandler.seenOffsets = new HashMap<>(); recordHandler.receivedMessages = 0; + deadLetterTopicConsumer.messages.clear(); + doForCurrentOffsets((tp, offset) -> { oldOffsets.put(tp, offset - 1); -- 2.20.1