X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FGenericApplicationTests.java;h=ebad5a80e950f239a63008ded28430468bc312b2;hb=4b19a0061b88863c015424088f429b6998557dc8;hp=1aacb945c5d62495ef45d01aad5669e8bfa3cc0b;hpb=27768041f2c2f4b1cbb8c45c9a5d665490050f76;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 1aacb94..ebad5a8 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -78,12 +78,13 @@ abstract class GenericApplicationTests @Test void commitsCurrentOffsetsOnSuccess() { - recordGenerator.generate(100, Set.of(), Set.of(), messageSender); + int numberOfGeneratedMessages = + recordGenerator.generate(false, false, messageSender); - await("100 records received") + await(numberOfGeneratedMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> receivedRecords.size() >= 100); + .until(() -> receivedRecords.size() >= numberOfGeneratedMessages); await("Offsets committed") .atMost(Duration.ofSeconds(10)) @@ -97,13 +98,16 @@ abstract class GenericApplicationTests assertThatExceptionOfType(IllegalStateException.class) .isThrownBy(() -> endlessConsumer.exitStatus()) .describedAs("Consumer should still be running"); + + recordGenerator.assertBusinessLogic(); } @Test @SkipWhenErrorCannotBeGenerated(poisonPill = true) void commitsOffsetOfErrorForReprocessingOnDeserializationError() { - recordGenerator.generate(100, Set.of(77), Set.of(), messageSender); + int numberOfGeneratedMessages = + recordGenerator.generate(true, false, messageSender); await("Consumer failed") .atMost(Duration.ofSeconds(30)) @@ -123,7 +127,7 @@ abstract class GenericApplicationTests compareToCommitedOffsets(newOffsets); assertThat(receivedRecords.size()) .describedAs("Received not all sent events") - .isLessThan(100); + .isLessThan(numberOfGeneratedMessages); assertThatNoException() .describedAs("Consumer should not be running") @@ -131,13 +135,16 @@ abstract class GenericApplicationTests assertThat(endlessConsumer.exitStatus()) .describedAs("Consumer should have exited abnormally") .containsInstanceOf(RecordDeserializationException.class); + + recordGenerator.assertBusinessLogic(); } @Test @SkipWhenErrorCannotBeGenerated(logicError = true) void doesNotCommitOffsetsOnLogicError() { - recordGenerator.generate(100, Set.of(), Set.of(77), messageSender); + int numberOfGeneratedMessages = + recordGenerator.generate(false, true, messageSender); await("Consumer failed") .atMost(Duration.ofSeconds(30)) @@ -157,7 +164,7 @@ abstract class GenericApplicationTests compareToCommitedOffsets(oldOffsets); assertThat(receivedRecords.size()) .describedAs("Received not all sent events") - .isLessThan(100); + .isLessThan(numberOfGeneratedMessages); assertThatNoException() .describedAs("Consumer should not be running") @@ -165,6 +172,8 @@ abstract class GenericApplicationTests assertThat(endlessConsumer.exitStatus()) .describedAs("Consumer should have exited abnormally") .containsInstanceOf(RuntimeException.class); + + recordGenerator.assertBusinessLogic(); } @@ -238,12 +247,12 @@ abstract class GenericApplicationTests public interface RecordGenerator { - void generate( - Set poisonPills, - Set logicErrors, + int generate( + boolean poisonPills, + boolean logicErrors, Consumer> messageSender); - default boolean canGeneratePoisionPill() + default boolean canGeneratePoisonPill() { return true; } @@ -252,6 +261,11 @@ abstract class GenericApplicationTests { return true; } + + default void assertBusinessLogic() + { + log.debug("No business-logic to assert"); + } } void sendMessage(ProducerRecord record)