Tests: Offsets werden unter TopicPartition abgelegt
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTests.java
index 21d1668..8461824 100644 (file)
@@ -11,7 +11,10 @@ import org.apache.kafka.common.serialization.BytesSerializer;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
 import org.springframework.boot.test.context.TestConfiguration;
@@ -35,6 +38,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 
 @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
 @TestPropertySource(
                properties = {
                                "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
@@ -61,12 +65,13 @@ class ApplicationTests
 
 
        @Test
+       @Order(1) // << The poistion pill is not skipped. Hence, this test must run first
        void commitsCurrentOffsetsOnSuccess()
        {
                send100Messages(i ->  new Bytes(longSerializer.serialize(TOPIC, i)));
 
                Set<ConsumerRecord<String, Long>> received = new HashSet<>();
-               Map<Integer, Long> offsets = runEndlessConsumer(record ->
+               Map<TopicPartition, Long> offsets = runEndlessConsumer(record ->
                {
                        received.add(record);
                        if (received.size() == 100)
@@ -77,6 +82,7 @@ class ApplicationTests
        }
 
        @Test
+       @Order(2)
        void commitsNoOffsetsOnError()
        {
                send100Messages(counter ->
@@ -84,9 +90,9 @@ class ApplicationTests
                                                ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
                                                : new Bytes(longSerializer.serialize(TOPIC, counter)));
 
-               Map<Integer, Long> oldOffsets = new HashMap<>();
-               doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp.partition(), offset -1));
-               Map<Integer, Long> newOffsets = runEndlessConsumer((record) -> {});
+               Map<TopicPartition, Long> oldOffsets = new HashMap<>();
+               doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp, offset -1));
+               Map<TopicPartition, Long> newOffsets = runEndlessConsumer((record) -> {});
 
                check(oldOffsets);
        }
@@ -133,11 +139,15 @@ class ApplicationTests
                }
        }
 
-       Map<Integer, Long> runEndlessConsumer(Consumer<ConsumerRecord<String, Long>> consumer)
+       Map<TopicPartition, Long> runEndlessConsumer(Consumer<ConsumerRecord<String, Long>> consumer)
        {
-               Map<Integer, Long> offsets = new HashMap<>();
-               doForCurrentOffsets((tp, offset) -> offsets.put(tp.partition(), offset -1));
-               Consumer<ConsumerRecord<String, Long>> captureOffset = record -> offsets.put(record.partition(), record.offset());
+               Map<TopicPartition, Long> offsets = new HashMap<>();
+               doForCurrentOffsets((tp, offset) -> offsets.put(tp, offset -1));
+               Consumer<ConsumerRecord<String, Long>> captureOffset =
+                               record ->
+                                               offsets.put(
+                                                               new TopicPartition(record.topic(), record.partition()),
+                                                               record.offset());
                EndlessConsumer<String, Long> endlessConsumer =
                                new EndlessConsumer<>(
                                                executor,
@@ -167,11 +177,11 @@ class ApplicationTests
                kafkaConsumer.unsubscribe();
        }
 
-       void check(Map<Integer, Long> offsets)
+       void check(Map<TopicPartition, Long> offsets)
        {
                doForCurrentOffsets((tp, offset) ->
                {
-                       Long expected = offsets.get(tp.partition()) + 1;
+                       Long expected = offsets.get(tp) + 1;
                        log.debug("Checking, if the offset for {} is {}", tp, expected);
                        assertThat(offset).isEqualTo(expected);
                });