X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FGenericApplicationTests.java;fp=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FGenericApplicationTests.java;h=b98066fcb1c6bbea2aeabd36a95547f7c8d7f7e6;hb=ac154bb18a6c575fe01e70cba6a86d10580dfb89;hp=4793d960e0530b257f0a5c20313a8179a3156bfc;hpb=7a7926c1799495a3ed016cb1b204cbfe13f833f1;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 4793d96..b98066f 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -6,7 +6,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.serialization.*; import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; @@ -42,6 +41,7 @@ import static org.awaitility.Awaitility.*; @TestPropertySource( properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}", "sumup.adder.topic=" + TOPIC, "spring.kafka.consumer.auto-commit-interval=500ms", "spring.mongodb.embedded.version=4.4.13" }) @@ -124,32 +124,29 @@ abstract class GenericApplicationTests { recordGenerator.generate(true, false, messageSender); - int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages(); + int numberOfValidMessages = + recordGenerator.getNumberOfMessages() - + recordGenerator.getNumberOfPoisonPills(); - await("Consumer failed") + await(numberOfValidMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + .until(() -> recordHandler.receivedMessages >= numberOfValidMessages); - endlessConsumer.start(); - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) + await("Offsets committed") + .atMost(Duration.ofSeconds(10)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); - assertThat(recordHandler.receivedMessages) - .describedAs("Received not all sent events") - .isLessThan(numberOfGeneratedMessages); + .untilAsserted(() -> + { + checkSeenOffsetsForProgress(); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + }); assertThat(endlessConsumer.running()) - .describedAs("Consumer should have exited") - .isFalse(); + .describedAs("Consumer should still be running") + .isTrue(); + endlessConsumer.stop(); recordGenerator.assertBusinessLogic(); } @@ -159,28 +156,29 @@ abstract class GenericApplicationTests { recordGenerator.generate(false, true, messageSender); - int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages(); + int numberOfValidMessages = + recordGenerator.getNumberOfMessages() - + recordGenerator.getNumberOfLogicErrors(); - await("Consumer failed") + await(numberOfValidMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + .until(() -> recordHandler.receivedMessages >= numberOfValidMessages); - endlessConsumer.start(); - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) + await("Offsets committed") + .atMost(Duration.ofSeconds(10)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + .untilAsserted(() -> + { + checkSeenOffsetsForProgress(); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + }); assertThat(endlessConsumer.running()) - .describedAs("Consumer should not be running") - .isFalse(); + .describedAs("Consumer should still be running") + .isTrue(); + endlessConsumer.stop(); recordGenerator.assertBusinessLogic(); }