X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;fp=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;h=26a34e406639f652c53878dcd8db12d7174598fd;hb=c446512ec3bfa29e5e8482074cb6daf7e2ee1b2f;hp=05eebd0417f660e9ef3773e6de759dece1922037;hpb=ee7e93b30bcee5b709ffc1631a2b0196a8194c5a;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 05eebd0..26a34e4 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -25,7 +25,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; import java.util.function.Consumer; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -40,7 +39,8 @@ import static org.awaitility.Awaitility.*; @TestPropertySource( properties = { "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", - "consumer.topic=" + TOPIC }) + "consumer.topic=" + TOPIC, + "consumer.commit-interval=100ms" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @Slf4j class ApplicationTests @@ -74,7 +74,6 @@ class ApplicationTests /** Tests methods */ @Test - @Order(1) // << The poistion pill is not skipped. Hence, this test must run first void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException { send100Messages((partition, key, counter) -> @@ -101,8 +100,7 @@ class ApplicationTests } @Test - @Order(2) - void commitsOffsetOfErrorForReprocessingOnError() + void commitsOffsetOfErrorForReprocessingOnDeserializationError() { send100Messages((partition, key, counter) -> { @@ -159,8 +157,8 @@ class ApplicationTests Set withProgress = new HashSet<>(); partitions().forEach(tp -> { - Long oldOffset = oldOffsets.get(tp); - Long newOffset = newOffsets.get(tp); + Long oldOffset = oldOffsets.get(tp) + 1; + Long newOffset = newOffsets.get(tp) + 1; if (!oldOffset.equals(newOffset)) { log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); @@ -175,6 +173,21 @@ class ApplicationTests /** Helper methods for setting up and running the tests */ + void seekToEnd() + { + offsetConsumer.assign(partitions()); + offsetConsumer.seekToEnd(partitions()); + partitions().forEach(tp -> + { + // seekToEnd() works lazily: it only takes effect on poll()/position() + Long offset = offsetConsumer.position(tp); + log.info("New position for {}: {}", tp, offset); + }); + // The new positions must be commited! + offsetConsumer.commitSync(); + offsetConsumer.unsubscribe(); + } + void doForCurrentOffsets(BiConsumer consumer) { offsetConsumer.assign(partitions()); @@ -238,6 +251,8 @@ class ApplicationTests { testHandler = record -> {} ; + seekToEnd(); + oldOffsets = new HashMap<>(); newOffsets = new HashMap<>(); receivedRecords = new HashSet<>();