Test prüft ungültige und unbekannte Nachrichten
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTests.java
index 9169de0..b5644b6 100644 (file)
@@ -41,7 +41,8 @@ import static org.awaitility.Awaitility.*;
 @TestPropertySource(
                properties = {
                                "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
-                               "consumer.topic=" + TOPIC })
+                               "consumer.topic=" + TOPIC,
+                               "consumer.commit-interval=1s" })
 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
 @Slf4j
 class ApplicationTests
@@ -75,8 +76,7 @@ class ApplicationTests
        /** Tests methods */
 
        @Test
-       @Order(1) // << The poistion pill is not skipped. Hence, this test must run first
-       void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException
+       void commitsCurrentOffsetsOnSuccess()
        {
                send100Messages((partition, key, counter) ->
                {
@@ -89,19 +89,21 @@ class ApplicationTests
                                type = "message";
                        }
                        else {
-                               value = serializeGreeting(key, counter);
+                               value = serializeGreeting(key);
                                type = "greeting";
                        }
 
-                       return toRecord(partition, key, value, type);
+                       return toRecord(partition, key, value, Optional.of(type));
                });
 
                await("100 records received")
                                .atMost(Duration.ofSeconds(30))
+                               .pollInterval(Duration.ofSeconds(1))
                                .until(() -> receivedRecords.size() >= 100);
 
                await("Offsets committed")
                                .atMost(Duration.ofSeconds(10))
+                               .pollInterval(Duration.ofSeconds(1))
                                .untilAsserted(() ->
                                {
                                        checkSeenOffsetsForProgress();
@@ -114,8 +116,64 @@ class ApplicationTests
        }
 
        @Test
-       @Order(2)
-       void commitsOffsetOfErrorForReprocessingOnError()
+       void commitsOffsetOfErrorForReprocessingOnDeserializationErrorInvalidMessage()
+       {
+               send100Messages((partition, key, counter) ->
+               {
+                       Bytes value;
+                       String type;
+
+                       if (counter == 77)
+                       {
+                               value = serializeFooMessage(key, counter);
+                               type = null;
+                       }
+                       else
+                       {
+                               if (counter%3 != 0)
+                               {
+                                       value = serializeClientMessage(key, counter);
+                                       type = "message";
+                               }
+                               else {
+                                       value = serializeGreeting(key);
+                                       type = "greeting";
+                               }
+                       }
+
+                       return toRecord(partition, key, value, Optional.ofNullable(type));
+               });
+
+               await("Consumer failed")
+                               .atMost(Duration.ofSeconds(30))
+                               .pollInterval(Duration.ofSeconds(1))
+                               .until(() -> !endlessConsumer.running());
+
+               checkSeenOffsetsForProgress();
+               compareToCommitedOffsets(newOffsets);
+
+               endlessConsumer.start();
+               await("Consumer failed")
+                               .atMost(Duration.ofSeconds(30))
+                               .pollInterval(Duration.ofSeconds(1))
+                               .until(() -> !endlessConsumer.running());
+
+               checkSeenOffsetsForProgress();
+               compareToCommitedOffsets(newOffsets);
+               assertThat(receivedRecords.size())
+                               .describedAs("Received not all sent events")
+                               .isLessThan(100);
+
+               assertThatNoException()
+                               .describedAs("Consumer should not be running")
+                               .isThrownBy(() -> endlessConsumer.exitStatus());
+               assertThat(endlessConsumer.exitStatus())
+                               .describedAs("Consumer should have exited abnormally")
+                               .containsInstanceOf(RecordDeserializationException.class);
+       }
+
+       @Test
+       void commitsOffsetOfErrorForReprocessingOnDeserializationErrorOnUnknownMessage()
        {
                send100Messages((partition, key, counter) ->
                {
@@ -135,16 +193,17 @@ class ApplicationTests
                                        type = "message";
                                }
                                else {
-                                       value = serializeGreeting(key, counter);
+                                       value = serializeGreeting(key);
                                        type = "greeting";
                                }
                        }
 
-                       return toRecord(partition, key, value, type);
+                       return toRecord(partition, key, value, Optional.of(type));
                });
 
                await("Consumer failed")
                                .atMost(Duration.ofSeconds(30))
+                               .pollInterval(Duration.ofSeconds(1))
                                .until(() -> !endlessConsumer.running());
 
                checkSeenOffsetsForProgress();
@@ -153,6 +212,7 @@ class ApplicationTests
                endlessConsumer.start();
                await("Consumer failed")
                                .atMost(Duration.ofSeconds(30))
+                               .pollInterval(Duration.ofSeconds(1))
                                .until(() -> !endlessConsumer.running());
 
                checkSeenOffsetsForProgress();
@@ -190,8 +250,8 @@ class ApplicationTests
                Set<TopicPartition> withProgress = new HashSet<>();
                partitions().forEach(tp ->
                {
-                       Long oldOffset = oldOffsets.get(tp);
-                       Long newOffset = newOffsets.get(tp);
+                       Long oldOffset = oldOffsets.get(tp) + 1;
+                       Long newOffset = newOffsets.get(tp) + 1;
                        if (!oldOffset.equals(newOffset))
                        {
                                log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
@@ -206,6 +266,21 @@ class ApplicationTests
 
        /** Helper methods for setting up and running the tests */
 
+       void seekToEnd()
+       {
+               offsetConsumer.assign(partitions());
+               offsetConsumer.seekToEnd(partitions());
+               partitions().forEach(tp ->
+               {
+                       // seekToEnd() works lazily: it only takes effect on poll()/position()
+                       Long offset = offsetConsumer.position(tp);
+                       log.info("New position for {}: {}", tp, offset);
+               });
+               // The new positions must be commited!
+               offsetConsumer.commitSync();
+               offsetConsumer.unsubscribe();
+       }
+
        void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
        {
                offsetConsumer.assign(partitions());
@@ -225,12 +300,12 @@ class ApplicationTests
 
        public interface RecordGenerator<K, V>
        {
-               public ProducerRecord<String, Bytes> generate(int partition, String key, long counter);
+               public ProducerRecord<String, Bytes> generate(int partition, String key, int counter);
        }
 
        void send100Messages(RecordGenerator recordGenerator)
        {
-               long i = 0;
+               int i = 0;
 
                for (int partition = 0; partition < 10; partition++)
                {
@@ -263,29 +338,30 @@ class ApplicationTests
                }
        }
 
-       ProducerRecord<String, Bytes> toRecord(int partition, String key, Bytes value, String type)
-       {
+       ProducerRecord<String, Bytes> toRecord(int partition, String key, Bytes value, Optional<String> type)
+               {
                ProducerRecord<String, Bytes> record =
                                new ProducerRecord<>(TOPIC, partition, key, value);
-               record.headers().add("__TypeId__", type.getBytes());
+
+               type.ifPresent(typeId -> record.headers().add("__TypeId__", typeId.getBytes()));
                return record;
        }
 
-       Bytes serializeClientMessage(String key, Long value)
+       Bytes serializeClientMessage(String key, int value)
        {
-               TestClientMessage message = new TestClientMessage(key, value.toString());
+               TestClientMessage message = new TestClientMessage(key, Integer.toString(value));
                return new Bytes(valueSerializer.serialize(TOPIC, message));
        }
 
-       Bytes serializeGreeting(String key, Long value)
+       Bytes serializeGreeting(String key)
        {
                TestGreeting message = new TestGreeting(key, LocalDateTime.now());
                return new Bytes(valueSerializer.serialize(TOPIC, message));
        }
 
-       Bytes serializeFooMessage(String key, Long value)
+       Bytes serializeFooMessage(String key, int value)
        {
-               TestFooMessage message = new TestFooMessage(key, value);
+               TestFooMessage message = new TestFooMessage(key, (long)value);
                return new Bytes(valueSerializer.serialize(TOPIC, message));
        }
 
@@ -294,6 +370,8 @@ class ApplicationTests
        {
                testHandler = record -> {} ;
 
+               seekToEnd();
+
                oldOffsets = new HashMap<>();
                newOffsets = new HashMap<>();
                receivedRecords = new HashSet<>();