Test prüft ungültige und unbekannte Nachrichten springified-consumer--serialization
authorKai Moritz <kai@juplo.de>
Tue, 26 Jul 2022 14:21:17 +0000 (16:21 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 26 Jul 2022 14:21:17 +0000 (16:21 +0200)
src/test/java/de/juplo/kafka/ApplicationTests.java

index 24d3a9e..b5644b6 100644 (file)
@@ -76,7 +76,7 @@ class ApplicationTests
        /** Tests methods */
 
        @Test
-       void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException
+       void commitsCurrentOffsetsOnSuccess()
        {
                send100Messages((partition, key, counter) ->
                {
@@ -89,11 +89,11 @@ class ApplicationTests
                                type = "message";
                        }
                        else {
-                               value = serializeGreeting(key, counter);
+                               value = serializeGreeting(key);
                                type = "greeting";
                        }
 
-                       return toRecord(partition, key, value, type);
+                       return toRecord(partition, key, value, Optional.of(type));
                });
 
                await("100 records received")
@@ -116,7 +116,64 @@ class ApplicationTests
        }
 
        @Test
-       void commitsOffsetOfErrorForReprocessingOnDeserializationError()
+       void commitsOffsetOfErrorForReprocessingOnDeserializationErrorInvalidMessage()
+       {
+               send100Messages((partition, key, counter) ->
+               {
+                       Bytes value;
+                       String type;
+
+                       if (counter == 77)
+                       {
+                               value = serializeFooMessage(key, counter);
+                               type = null;
+                       }
+                       else
+                       {
+                               if (counter%3 != 0)
+                               {
+                                       value = serializeClientMessage(key, counter);
+                                       type = "message";
+                               }
+                               else {
+                                       value = serializeGreeting(key);
+                                       type = "greeting";
+                               }
+                       }
+
+                       return toRecord(partition, key, value, Optional.ofNullable(type));
+               });
+
+               await("Consumer failed")
+                               .atMost(Duration.ofSeconds(30))
+                               .pollInterval(Duration.ofSeconds(1))
+                               .until(() -> !endlessConsumer.running());
+
+               checkSeenOffsetsForProgress();
+               compareToCommitedOffsets(newOffsets);
+
+               endlessConsumer.start();
+               await("Consumer failed")
+                               .atMost(Duration.ofSeconds(30))
+                               .pollInterval(Duration.ofSeconds(1))
+                               .until(() -> !endlessConsumer.running());
+
+               checkSeenOffsetsForProgress();
+               compareToCommitedOffsets(newOffsets);
+               assertThat(receivedRecords.size())
+                               .describedAs("Received not all sent events")
+                               .isLessThan(100);
+
+               assertThatNoException()
+                               .describedAs("Consumer should not be running")
+                               .isThrownBy(() -> endlessConsumer.exitStatus());
+               assertThat(endlessConsumer.exitStatus())
+                               .describedAs("Consumer should have exited abnormally")
+                               .containsInstanceOf(RecordDeserializationException.class);
+       }
+
+       @Test
+       void commitsOffsetOfErrorForReprocessingOnDeserializationErrorOnUnknownMessage()
        {
                send100Messages((partition, key, counter) ->
                {
@@ -136,12 +193,12 @@ class ApplicationTests
                                        type = "message";
                                }
                                else {
-                                       value = serializeGreeting(key, counter);
+                                       value = serializeGreeting(key);
                                        type = "greeting";
                                }
                        }
 
-                       return toRecord(partition, key, value, type);
+                       return toRecord(partition, key, value, Optional.of(type));
                });
 
                await("Consumer failed")
@@ -243,12 +300,12 @@ class ApplicationTests
 
        public interface RecordGenerator<K, V>
        {
-               public ProducerRecord<String, Bytes> generate(int partition, String key, long counter);
+               public ProducerRecord<String, Bytes> generate(int partition, String key, int counter);
        }
 
        void send100Messages(RecordGenerator recordGenerator)
        {
-               long i = 0;
+               int i = 0;
 
                for (int partition = 0; partition < 10; partition++)
                {
@@ -281,29 +338,30 @@ class ApplicationTests
                }
        }
 
-       ProducerRecord<String, Bytes> toRecord(int partition, String key, Bytes value, String type)
-       {
+       ProducerRecord<String, Bytes> toRecord(int partition, String key, Bytes value, Optional<String> type)
+               {
                ProducerRecord<String, Bytes> record =
                                new ProducerRecord<>(TOPIC, partition, key, value);
-               record.headers().add("__TypeId__", type.getBytes());
+
+               type.ifPresent(typeId -> record.headers().add("__TypeId__", typeId.getBytes()));
                return record;
        }
 
-       Bytes serializeClientMessage(String key, Long value)
+       Bytes serializeClientMessage(String key, int value)
        {
-               TestClientMessage message = new TestClientMessage(key, value.toString());
+               TestClientMessage message = new TestClientMessage(key, Integer.toString(value));
                return new Bytes(valueSerializer.serialize(TOPIC, message));
        }
 
-       Bytes serializeGreeting(String key, Long value)
+       Bytes serializeGreeting(String key)
        {
                TestGreeting message = new TestGreeting(key, LocalDateTime.now());
                return new Bytes(valueSerializer.serialize(TOPIC, message));
        }
 
-       Bytes serializeFooMessage(String key, Long value)
+       Bytes serializeFooMessage(String key, int value)
        {
-               TestFooMessage message = new TestFooMessage(key, value);
+               TestFooMessage message = new TestFooMessage(key, (long)value);
                return new Bytes(valueSerializer.serialize(TOPIC, message));
        }