Springify: Test für das lesen gemischter Nachrichten mit aller Fehler-Typen
authorKai Moritz <kai@juplo.de>
Sun, 5 Jun 2022 05:14:54 +0000 (07:14 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 5 Jun 2022 12:20:40 +0000 (14:20 +0200)
src/main/java/de/juplo/kafka/FooMessage.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/ApplicationTests.java

diff --git a/src/main/java/de/juplo/kafka/FooMessage.java b/src/main/java/de/juplo/kafka/FooMessage.java
new file mode 100644 (file)
index 0000000..70c2544
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka;
+
+import lombok.Data;
+
+
+@Data
+public class FooMessage
+{
+  private String client;
+  private Long timestamp;
+}
index c3fbe45..3c97bc3 100644 (file)
@@ -102,7 +102,6 @@ class ApplicationTests
                                .isTrue();
        }
 
-
        @Test
        void mixedMessages()
        {
@@ -128,6 +127,55 @@ class ApplicationTests
                                .isTrue();
        }
 
+       @Test
+       void unknownMessagesAndPoisonPillsForAllTypes()
+       {
+               send100Messages((key, counter) ->
+               {
+                       switch((int) (counter%10))
+                       {
+                               case 1:
+                                       return serializeString("BOOM!", "message");
+                               case 2:
+                                       return serializeString("BOOM!", "greeting");
+                               case 3:
+                                       return serializeString("BOOM!", "foo");
+                               case 4:
+                                       return serializeString("BOOM!", "bar");
+                               case 5:
+                                       return serializeAsFooMessage(key);
+                               case 6:
+                               case 7:
+                                       return serializeAsGreeting(key);
+                               default:
+                                       return serializeAsClientMessage(key, counter);
+                       }
+               });
+
+               await("100 records received")
+                               .atMost(Duration.ofSeconds(30))
+                               .until(() -> received.size() == 50);
+
+               await("Offsets committed")
+                               .atMost(Duration.ofSeconds(10))
+                               .untilAsserted(() ->
+                               {
+                                       // UNSCHÖN:
+                                       // Funktioniert nur, weil nach den Nachrichten, die einen
+                                       // Deserialisierungs-Fehler auslösen noch valide Nachrichten
+                                       // gelesen werden.
+                                       // GRUND:
+                                       // Der MessageHandler sieht die Offsets der Fehlerhaften
+                                       // Nachrichten nicht!
+                                       checkSeenOffsetsForProgress();
+                                       compareToCommitedOffsets(newOffsets);
+                               });
+
+               assertThat(endlessConsumer.isRunning())
+                               .describedAs("Consumer should still be running")
+                               .isTrue();
+       }
+
        @Test
        void commitsCurrentOffsetsOnDeserializationError()
        {
@@ -199,7 +247,7 @@ class ApplicationTests
                        Long expected = offsetsToCheck.get(tp) + 1;
                        log.debug("TEST: Comparing the expected offset of {} for {} to {}", expected, tp, offset);
                        assertThat(offset)
-                                       .describedAs("Committed offset corresponds to the offset of the consumer")
+                                       .describedAs("Committed offset corresponds to the offset of the consumer for " + tp)
                                        .isEqualTo(expected);
                });
        }
@@ -302,6 +350,14 @@ class ApplicationTests
                return new BytesAndType(serialize(greeting), "greeting");
        }
 
+       BytesAndType serializeAsFooMessage(Integer key)
+       {
+               FooMessage foo = new FooMessage();
+               foo.setClient(key.toString());
+               foo.setTimestamp(System.currentTimeMillis());
+               return new BytesAndType(serialize(foo), "foo");
+       }
+
        BytesAndType serializeString(String message, String messageType)
        {
                return new BytesAndType(new Bytes(message.getBytes()), messageType);