X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;h=6f58180f03c4a6a0a4741af5e43ba2c5e6155ca5;hb=4a6907ee4675c3d9bc7fd4ba31a7048527f53a50;hp=bf38b0569ce1bd0fb67e91621173f8c32facc2bd;hpb=0e0f30edc9bdcd230db1e7f4cbe414f85d5d288a;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index bf38b05..6f58180 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -71,6 +71,8 @@ class ApplicationTests Map newOffsets; + /** Tests methods */ + @Test @Order(1) // << The poistion pill is not skipped. Hence, this test must run first void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException @@ -84,15 +86,18 @@ class ApplicationTests .atMost(Duration.ofSeconds(30)) .until(() -> received.size() >= 100); - endlessConsumer.stop(); - - checkSeenOffsetsForProgress(); - compareToCommitedOffsets(newOffsets); + await("Offsets committed") + .atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> + { + checkSeenOffsetsForProgress(); + compareToCommitedOffsets(newOffsets); + }); } @Test @Order(2) - void commitsNoOffsetsOnError() + void commitsOffsetOfErrorForReprocessingOnError() { send100Messages(counter -> counter == 77 @@ -104,7 +109,64 @@ class ApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - compareToCommitedOffsets(oldOffsets); + compareToCommitedOffsets(newOffsets); + + endlessConsumer.start(); + await("Consumer failed") + .atMost(Duration.ofSeconds(30)) + .until(() -> !endlessConsumer.running()); + + checkSeenOffsetsForProgress(); + compareToCommitedOffsets(newOffsets); + } + + + /** Helper methods for the verification of expectations */ + + void compareToCommitedOffsets(Map offsetsToCheck) + { + doForCurrentOffsets((tp, offset) -> + { + Long expected = offsetsToCheck.get(tp) + 1; + log.debug("Checking, if the offset for {} is {}", tp, expected); + assertThat(offset).isEqualTo(expected); + }); + } + + void checkSeenOffsetsForProgress() + { + // Be sure, that some messages were consumed...! + Set withProgress = new HashSet<>(); + partitions().forEach(tp -> + { + Long oldOffset = oldOffsets.get(tp); + Long newOffset = newOffsets.get(tp); + if (!oldOffset.equals(newOffset)) + { + log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); + withProgress.add(tp); + } + }); + assertThat(withProgress).isNotEmpty().describedAs("Found no partitions with any offset-progress"); + } + + + /** Helper methods for setting up and running the tests */ + + void doForCurrentOffsets(BiConsumer consumer) + { + offsetConsumer.assign(partitions()); + partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp))); + offsetConsumer.unsubscribe(); + } + + List partitions() + { + return + IntStream + .range(0, PARTITIONS) + .mapToObj(partition -> new TopicPartition(TOPIC, partition)) + .collect(Collectors.toList()); } @@ -149,6 +211,7 @@ class ApplicationTests } } + @BeforeEach public void init() { @@ -183,50 +246,6 @@ class ApplicationTests endlessConsumer.start(); } - List partitions() - { - return - IntStream - .range(0, PARTITIONS) - .mapToObj(partition -> new TopicPartition(TOPIC, partition)) - .collect(Collectors.toList()); - } - - void doForCurrentOffsets(BiConsumer consumer) - { - offsetConsumer.assign(partitions()); - partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp))); - offsetConsumer.unsubscribe(); - } - - void checkSeenOffsetsForProgress() - { - // Be sure, that some messages were consumed...! - Set withProgress = new HashSet<>(); - partitions().forEach(tp -> - { - Long oldOffset = oldOffsets.get(tp); - Long newOffset = newOffsets.get(tp); - if (!oldOffset.equals(newOffset)) - { - log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); - withProgress.add(tp); - } - }); - assertThat(withProgress).isNotEmpty().describedAs("Found no partitions with any offset-progress"); - } - - void compareToCommitedOffsets(Map offsetsToCheck) - { - doForCurrentOffsets((tp, offset) -> - { - Long expected = offsetsToCheck.get(tp) + 1; - log.debug("Checking, if the offset for {} is {}", tp, expected); - assertThat(offset).isEqualTo(expected); - }); - } - - @AfterEach public void deinit() { @@ -240,6 +259,7 @@ class ApplicationTests } } + @TestConfiguration @Import(ApplicationConfiguration.class) public static class Configuration