X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;h=8461824159886ec396a53117f7d887e98affa9a9;hb=ad3b3853816932c81e019f7746f7589ffc2b4c55;hp=21d1668240d41d39481178821173d6a5f7e4bbc1;hpb=fe867d6d14fd90aab95bdd7ba9374a585c268d3f;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 21d1668..8461824 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -11,7 +11,10 @@ import org.apache.kafka.common.serialization.BytesSerializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; import org.springframework.boot.test.context.TestConfiguration; @@ -35,6 +38,7 @@ import static org.assertj.core.api.Assertions.assertThat; @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class) +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) @TestPropertySource( properties = { "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", @@ -61,12 +65,13 @@ class ApplicationTests @Test + @Order(1) // << The poistion pill is not skipped. Hence, this test must run first void commitsCurrentOffsetsOnSuccess() { send100Messages(i -> new Bytes(longSerializer.serialize(TOPIC, i))); Set> received = new HashSet<>(); - Map offsets = runEndlessConsumer(record -> + Map offsets = runEndlessConsumer(record -> { received.add(record); if (received.size() == 100) @@ -77,6 +82,7 @@ class ApplicationTests } @Test + @Order(2) void commitsNoOffsetsOnError() { send100Messages(counter -> @@ -84,9 +90,9 @@ class ApplicationTests ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) : new Bytes(longSerializer.serialize(TOPIC, counter))); - Map oldOffsets = new HashMap<>(); - doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp.partition(), offset -1)); - Map newOffsets = runEndlessConsumer((record) -> {}); + Map oldOffsets = new HashMap<>(); + doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp, offset -1)); + Map newOffsets = runEndlessConsumer((record) -> {}); check(oldOffsets); } @@ -133,11 +139,15 @@ class ApplicationTests } } - Map runEndlessConsumer(Consumer> consumer) + Map runEndlessConsumer(Consumer> consumer) { - Map offsets = new HashMap<>(); - doForCurrentOffsets((tp, offset) -> offsets.put(tp.partition(), offset -1)); - Consumer> captureOffset = record -> offsets.put(record.partition(), record.offset()); + Map offsets = new HashMap<>(); + doForCurrentOffsets((tp, offset) -> offsets.put(tp, offset -1)); + Consumer> captureOffset = + record -> + offsets.put( + new TopicPartition(record.topic(), record.partition()), + record.offset()); EndlessConsumer endlessConsumer = new EndlessConsumer<>( executor, @@ -167,11 +177,11 @@ class ApplicationTests kafkaConsumer.unsubscribe(); } - void check(Map offsets) + void check(Map offsets) { doForCurrentOffsets((tp, offset) -> { - Long expected = offsets.get(tp.partition()) + 1; + Long expected = offsets.get(tp) + 1; log.debug("Checking, if the offset for {} is {}", tp, expected); assertThat(offset).isEqualTo(expected); });