X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;h=24d3a9e0c07649282d3107c999cf49cffd94f029;hb=c032639acf2861b9039dc08e98bb7d9d1f59b086;hp=9169de009412bbac0c945ff6d552a4aa339393ab;hpb=a8aed63e92d58731176dde8b7cec4f5a022ac813;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 9169de0..24d3a9e 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -41,7 +41,8 @@ import static org.awaitility.Awaitility.*; @TestPropertySource( properties = { "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", - "consumer.topic=" + TOPIC }) + "consumer.topic=" + TOPIC, + "consumer.commit-interval=1s" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @Slf4j class ApplicationTests @@ -75,7 +76,6 @@ class ApplicationTests /** Tests methods */ @Test - @Order(1) // << The poistion pill is not skipped. Hence, this test must run first void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException { send100Messages((partition, key, counter) -> @@ -98,10 +98,12 @@ class ApplicationTests await("100 records received") .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) .until(() -> receivedRecords.size() >= 100); await("Offsets committed") .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofSeconds(1)) .untilAsserted(() -> { checkSeenOffsetsForProgress(); @@ -114,8 +116,7 @@ class ApplicationTests } @Test - @Order(2) - void commitsOffsetOfErrorForReprocessingOnError() + void commitsOffsetOfErrorForReprocessingOnDeserializationError() { send100Messages((partition, key, counter) -> { @@ -145,6 +146,7 @@ class ApplicationTests await("Consumer failed") .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); @@ -153,6 +155,7 @@ class ApplicationTests endlessConsumer.start(); await("Consumer failed") .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); @@ -190,8 +193,8 @@ class ApplicationTests Set withProgress = new HashSet<>(); partitions().forEach(tp -> { - Long oldOffset = oldOffsets.get(tp); - Long newOffset = newOffsets.get(tp); + Long oldOffset = oldOffsets.get(tp) + 1; + Long newOffset = newOffsets.get(tp) + 1; if (!oldOffset.equals(newOffset)) { log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); @@ -206,6 +209,21 @@ class ApplicationTests /** Helper methods for setting up and running the tests */ + void seekToEnd() + { + offsetConsumer.assign(partitions()); + offsetConsumer.seekToEnd(partitions()); + partitions().forEach(tp -> + { + // seekToEnd() works lazily: it only takes effect on poll()/position() + Long offset = offsetConsumer.position(tp); + log.info("New position for {}: {}", tp, offset); + }); + // The new positions must be commited! + offsetConsumer.commitSync(); + offsetConsumer.unsubscribe(); + } + void doForCurrentOffsets(BiConsumer consumer) { offsetConsumer.assign(partitions()); @@ -294,6 +312,8 @@ class ApplicationTests { testHandler = record -> {} ; + seekToEnd(); + oldOffsets = new HashMap<>(); newOffsets = new HashMap<>(); receivedRecords = new HashSet<>();