From: Kai Moritz Date: Thu, 18 Aug 2022 21:36:22 +0000 (+0200) Subject: GRÜN: Fehler in der Test-Logik korrigiert X-Git-Tag: sumup-adder---lvm-2-tage~9^2 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=9ae781a6e047a7b857aaf7fd79d134eb7b48b267;p=demos%2Fkafka%2Ftraining GRÜN: Fehler in der Test-Logik korrigiert * Die Assertion, dass nach einem wiederholten Versuch, den Logik-Fehler zu konsumieren nicht mehr Nachrichten konsumiert wurden, als für den Test generiert wurden ist nicht gültig, da bei einem Logik-Fehler ja gerade _kein_ Commit der zuletzt gelesenen Nachrichten erfolgt, da dies dazu führt, dass der Offset für Partitionen erhöht wird, für die vor dem Eintreten des Fehlers noch nicht alle Nachrichten gelesen wurden, wenn nicht explizti Seek's für diese Partitionen durchgeführt werden. * Die Assertion, dass die Offset-Position nach einem Fehler der Offset- Position _vor_ der Ausführung der Fachlogik entspricht ist falsch, da durchaus Commits durchgeführt werden können, bevor der Fehler auftritt. Daher wird jetzt explizit geprüft, dass ** Die Offset-Position für keine Partition größer ist, als der Offset der dort zuletzt gesehenen Nachricht. ** UND mindestens eine Partition existiert, deren Offset _kleiner_ ist, als der Offset der zuletzt gesehenen Nachricht. --- diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index b0abf37..649cdba 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -59,7 +59,7 @@ abstract class GenericApplicationTests KafkaConsumer offsetConsumer; EndlessConsumer endlessConsumer; Map oldOffsets; - Map newOffsets; + Map seenOffsets; Set> receivedRecords; @@ -92,7 +92,7 @@ abstract class GenericApplicationTests .untilAsserted(() -> { checkSeenOffsetsForProgress(); - compareToCommitedOffsets(newOffsets); + assertSeenOffsetsEqualCommittedOffsets(seenOffsets); }); assertThatExceptionOfType(IllegalStateException.class) @@ -115,7 +115,7 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - compareToCommitedOffsets(newOffsets); + assertSeenOffsetsEqualCommittedOffsets(seenOffsets); endlessConsumer.start(); await("Consumer failed") @@ -124,7 +124,7 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - compareToCommitedOffsets(newOffsets); + assertSeenOffsetsEqualCommittedOffsets(seenOffsets); assertThat(receivedRecords.size()) .describedAs("Received not all sent events") .isLessThan(numberOfGeneratedMessages); @@ -152,7 +152,7 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - compareToCommitedOffsets(oldOffsets); + assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets); endlessConsumer.start(); await("Consumer failed") @@ -160,11 +160,7 @@ abstract class GenericApplicationTests .pollInterval(Duration.ofSeconds(1)) .until(() -> !endlessConsumer.running()); - checkSeenOffsetsForProgress(); - compareToCommitedOffsets(oldOffsets); - assertThat(receivedRecords.size()) - .describedAs("Received not all sent events") - .isLessThan(numberOfGeneratedMessages); + assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets); assertThatNoException() .describedAs("Consumer should not be running") @@ -179,18 +175,37 @@ abstract class GenericApplicationTests /** Helper methods for the verification of expectations */ - void compareToCommitedOffsets(Map offsetsToCheck) + void assertSeenOffsetsEqualCommittedOffsets(Map offsetsToCheck) { doForCurrentOffsets((tp, offset) -> { Long expected = offsetsToCheck.get(tp) + 1; - log.debug("Checking, if the offset for {} is {}", tp, expected); + log.debug("Checking, if the offset {} for {} is exactly {}", offset, tp, expected); assertThat(offset) .describedAs("Committed offset corresponds to the offset of the consumer") .isEqualTo(expected); }); } + void assertSeenOffsetsAreBehindCommittedOffsets(Map offsetsToCheck) + { + List isOffsetBehindSeen = new LinkedList<>(); + + doForCurrentOffsets((tp, offset) -> + { + Long expected = offsetsToCheck.get(tp) + 1; + log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected); + assertThat(offset) + .describedAs("Committed offset corresponds to the offset of the consumer") + .isLessThanOrEqualTo(expected); + isOffsetBehindSeen.add(offset < expected); + }); + + assertThat(isOffsetBehindSeen.stream().reduce(false, (result, next) -> result | next)) + .describedAs("Committed offsets are behind seen offsets") + .isTrue(); + } + void checkSeenOffsetsForProgress() { // Be sure, that some messages were consumed...! @@ -198,7 +213,7 @@ abstract class GenericApplicationTests partitions().forEach(tp -> { Long oldOffset = oldOffsets.get(tp) + 1; - Long newOffset = newOffsets.get(tp) + 1; + Long newOffset = seenOffsets.get(tp) + 1; if (!oldOffset.equals(newOffset)) { log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); @@ -315,19 +330,19 @@ abstract class GenericApplicationTests seekToEnd(); oldOffsets = new HashMap<>(); - newOffsets = new HashMap<>(); + seenOffsets = new HashMap<>(); receivedRecords = new HashSet<>(); doForCurrentOffsets((tp, offset) -> { oldOffsets.put(tp, offset - 1); - newOffsets.put(tp, offset - 1); + seenOffsets.put(tp, offset - 1); }); Consumer> captureOffsetAndExecuteTestHandler = record -> { - newOffsets.put( + seenOffsets.put( new TopicPartition(record.topic(), record.partition()), record.offset()); receivedRecords.add(record);