X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;h=3bac537d6e387d5b07083776d8dc687387f2d8e2;hb=be1b513f8bd7646f9ceb3a7ba90952641e3af125;hp=39ff0d7110cac758547928f8dc77e1fceec50e25;hpb=221f774959bb0d1e113bdeefd8fdc52ea1291081;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 39ff0d7..3bac537 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -25,7 +25,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; import java.util.function.Consumer; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -40,7 +39,8 @@ import static org.awaitility.Awaitility.*; @TestPropertySource( properties = { "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", - "consumer.topic=" + TOPIC }) + "consumer.topic=" + TOPIC, + "consumer.commit-interval=1s" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @Slf4j class ApplicationTests @@ -74,7 +74,6 @@ class ApplicationTests /** Tests methods */ @Test - @Order(1) // << The poistion pill is not skipped. Hence, this test must run first void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException { send100Messages((partition, key, counter) -> @@ -85,10 +84,12 @@ class ApplicationTests await("100 records received") .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) .until(() -> receivedRecords.size() >= 100); await("Offsets committed") .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofSeconds(1)) .untilAsserted(() -> { checkSeenOffsetsForProgress(); @@ -101,8 +102,7 @@ class ApplicationTests } @Test - @Order(2) - void commitsOffsetOfErrorForReprocessingOnError() + void commitsOffsetOfErrorForReprocessingOnDeserializationError() { send100Messages((partition, key, counter) -> { @@ -114,6 +114,7 @@ class ApplicationTests await("Consumer failed") .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); @@ -122,6 +123,7 @@ class ApplicationTests endlessConsumer.start(); await("Consumer failed") .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); @@ -159,8 +161,8 @@ class ApplicationTests Set withProgress = new HashSet<>(); partitions().forEach(tp -> { - Long oldOffset = oldOffsets.get(tp); - Long newOffset = newOffsets.get(tp); + Long oldOffset = oldOffsets.get(tp) + 1; + Long newOffset = newOffsets.get(tp) + 1; if (!oldOffset.equals(newOffset)) { log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); @@ -175,6 +177,21 @@ class ApplicationTests /** Helper methods for setting up and running the tests */ + void seekToEnd() + { + offsetConsumer.assign(partitions()); + offsetConsumer.seekToEnd(partitions()); + partitions().forEach(tp -> + { + // seekToEnd() works lazily: it only takes effect on poll()/position() + Long offset = offsetConsumer.position(tp); + log.info("New position for {}: {}", tp, offset); + }); + // The new positions must be commited! + offsetConsumer.commitSync(); + offsetConsumer.unsubscribe(); + } + void doForCurrentOffsets(BiConsumer consumer) { offsetConsumer.assign(partitions()); @@ -206,7 +223,7 @@ class ApplicationTests for (int key = 0; key < 10; key++) { ProducerRecord record = - recordGenerator.generate(partition, Integer.toString(key%2), ++i); + recordGenerator.generate(partition, Integer.toString(partition*10+key%2), ++i); kafkaProducer.send(record, (metadata, e) -> { @@ -238,6 +255,8 @@ class ApplicationTests { testHandler = record -> {} ; + seekToEnd(); + oldOffsets = new HashMap<>(); newOffsets = new HashMap<>(); receivedRecords = new HashSet<>();