X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FGenericApplicationTests.java;h=649cdba9ffc1de59d4a7b2db2316f37f9a303f06;hb=9ae781a6e047a7b857aaf7fd79d134eb7b48b267;hp=1aacb945c5d62495ef45d01aad5669e8bfa3cc0b;hpb=27768041f2c2f4b1cbb8c45c9a5d665490050f76;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 1aacb94..649cdba 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -37,7 +37,7 @@ import static org.awaitility.Awaitility.*; properties = { "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", "consumer.topic=" + TOPIC, - "consumer.commit-interval=1s" }) + "consumer.commit-interval=500ms" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @Slf4j abstract class GenericApplicationTests @@ -59,7 +59,7 @@ abstract class GenericApplicationTests KafkaConsumer offsetConsumer; EndlessConsumer endlessConsumer; Map oldOffsets; - Map newOffsets; + Map seenOffsets; Set> receivedRecords; @@ -78,12 +78,13 @@ abstract class GenericApplicationTests @Test void commitsCurrentOffsetsOnSuccess() { - recordGenerator.generate(100, Set.of(), Set.of(), messageSender); + int numberOfGeneratedMessages = + recordGenerator.generate(false, false, messageSender); - await("100 records received") + await(numberOfGeneratedMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> receivedRecords.size() >= 100); + .until(() -> receivedRecords.size() >= numberOfGeneratedMessages); await("Offsets committed") .atMost(Duration.ofSeconds(10)) @@ -91,19 +92,22 @@ abstract class GenericApplicationTests .untilAsserted(() -> { checkSeenOffsetsForProgress(); - compareToCommitedOffsets(newOffsets); + assertSeenOffsetsEqualCommittedOffsets(seenOffsets); }); assertThatExceptionOfType(IllegalStateException.class) .isThrownBy(() -> endlessConsumer.exitStatus()) .describedAs("Consumer should still be running"); + + recordGenerator.assertBusinessLogic(); } @Test @SkipWhenErrorCannotBeGenerated(poisonPill = true) void commitsOffsetOfErrorForReprocessingOnDeserializationError() { - recordGenerator.generate(100, Set.of(77), Set.of(), messageSender); + int numberOfGeneratedMessages = + recordGenerator.generate(true, false, messageSender); await("Consumer failed") .atMost(Duration.ofSeconds(30)) @@ -111,7 +115,7 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - compareToCommitedOffsets(newOffsets); + assertSeenOffsetsEqualCommittedOffsets(seenOffsets); endlessConsumer.start(); await("Consumer failed") @@ -120,10 +124,10 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - compareToCommitedOffsets(newOffsets); + assertSeenOffsetsEqualCommittedOffsets(seenOffsets); assertThat(receivedRecords.size()) .describedAs("Received not all sent events") - .isLessThan(100); + .isLessThan(numberOfGeneratedMessages); assertThatNoException() .describedAs("Consumer should not be running") @@ -131,13 +135,16 @@ abstract class GenericApplicationTests assertThat(endlessConsumer.exitStatus()) .describedAs("Consumer should have exited abnormally") .containsInstanceOf(RecordDeserializationException.class); + + recordGenerator.assertBusinessLogic(); } @Test @SkipWhenErrorCannotBeGenerated(logicError = true) void doesNotCommitOffsetsOnLogicError() { - recordGenerator.generate(100, Set.of(), Set.of(77), messageSender); + int numberOfGeneratedMessages = + recordGenerator.generate(false, true, messageSender); await("Consumer failed") .atMost(Duration.ofSeconds(30)) @@ -145,7 +152,7 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - compareToCommitedOffsets(oldOffsets); + assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets); endlessConsumer.start(); await("Consumer failed") @@ -153,11 +160,7 @@ abstract class GenericApplicationTests .pollInterval(Duration.ofSeconds(1)) .until(() -> !endlessConsumer.running()); - checkSeenOffsetsForProgress(); - compareToCommitedOffsets(oldOffsets); - assertThat(receivedRecords.size()) - .describedAs("Received not all sent events") - .isLessThan(100); + assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets); assertThatNoException() .describedAs("Consumer should not be running") @@ -165,23 +168,44 @@ abstract class GenericApplicationTests assertThat(endlessConsumer.exitStatus()) .describedAs("Consumer should have exited abnormally") .containsInstanceOf(RuntimeException.class); + + recordGenerator.assertBusinessLogic(); } /** Helper methods for the verification of expectations */ - void compareToCommitedOffsets(Map offsetsToCheck) + void assertSeenOffsetsEqualCommittedOffsets(Map offsetsToCheck) { doForCurrentOffsets((tp, offset) -> { Long expected = offsetsToCheck.get(tp) + 1; - log.debug("Checking, if the offset for {} is {}", tp, expected); + log.debug("Checking, if the offset {} for {} is exactly {}", offset, tp, expected); assertThat(offset) .describedAs("Committed offset corresponds to the offset of the consumer") .isEqualTo(expected); }); } + void assertSeenOffsetsAreBehindCommittedOffsets(Map offsetsToCheck) + { + List isOffsetBehindSeen = new LinkedList<>(); + + doForCurrentOffsets((tp, offset) -> + { + Long expected = offsetsToCheck.get(tp) + 1; + log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected); + assertThat(offset) + .describedAs("Committed offset corresponds to the offset of the consumer") + .isLessThanOrEqualTo(expected); + isOffsetBehindSeen.add(offset < expected); + }); + + assertThat(isOffsetBehindSeen.stream().reduce(false, (result, next) -> result | next)) + .describedAs("Committed offsets are behind seen offsets") + .isTrue(); + } + void checkSeenOffsetsForProgress() { // Be sure, that some messages were consumed...! @@ -189,7 +213,7 @@ abstract class GenericApplicationTests partitions().forEach(tp -> { Long oldOffset = oldOffsets.get(tp) + 1; - Long newOffset = newOffsets.get(tp) + 1; + Long newOffset = seenOffsets.get(tp) + 1; if (!oldOffset.equals(newOffset)) { log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); @@ -238,12 +262,12 @@ abstract class GenericApplicationTests public interface RecordGenerator { - void generate( - Set poisonPills, - Set logicErrors, + int generate( + boolean poisonPills, + boolean logicErrors, Consumer> messageSender); - default boolean canGeneratePoisionPill() + default boolean canGeneratePoisonPill() { return true; } @@ -252,6 +276,11 @@ abstract class GenericApplicationTests { return true; } + + default void assertBusinessLogic() + { + log.debug("No business-logic to assert"); + } } void sendMessage(ProducerRecord record) @@ -301,19 +330,19 @@ abstract class GenericApplicationTests seekToEnd(); oldOffsets = new HashMap<>(); - newOffsets = new HashMap<>(); + seenOffsets = new HashMap<>(); receivedRecords = new HashSet<>(); doForCurrentOffsets((tp, offset) -> { oldOffsets.put(tp, offset - 1); - newOffsets.put(tp, offset - 1); + seenOffsets.put(tp, offset - 1); }); Consumer> captureOffsetAndExecuteTestHandler = record -> { - newOffsets.put( + seenOffsets.put( new TopicPartition(record.topic(), record.partition()), record.offset()); receivedRecords.add(record);